From ed04d94f6d7132055f97a2f757b85c03c5f2a0b6 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 6 Jan 2017 23:24:47 +0100 Subject: [PATCH] Server: try to have a better video integrity --- server/controllers/api/remote/videos.js | 56 ++++++++++-- server/controllers/api/videos.js | 114 ++++++++++++++++-------- server/helpers/utils.js | 16 +++- server/lib/friends.js | 66 +++++++++----- server/models/pod.js | 9 +- server/models/request.js | 4 +- 6 files changed, 194 insertions(+), 71 deletions(-) diff --git a/server/controllers/api/remote/videos.js b/server/controllers/api/remote/videos.js index d02da4463..6d768eae8 100644 --- a/server/controllers/api/remote/videos.js +++ b/server/controllers/api/remote/videos.js @@ -10,6 +10,7 @@ const secureMiddleware = middlewares.secure const videosValidators = middlewares.validators.remote.videos const signatureValidators = middlewares.validators.remote.signature const logger = require('../../../helpers/logger') +const utils = require('../../../helpers/utils') const router = express.Router() @@ -37,11 +38,11 @@ function remoteVideos (req, res, next) { switch (request.type) { case 'add': - addRemoteVideo(data, fromPod, callbackEach) + addRemoteVideoRetryWrapper(data, fromPod, callbackEach) break case 'update': - updateRemoteVideo(data, fromPod, callbackEach) + updateRemoteVideoRetryWrapper(data, fromPod, callbackEach) break case 'remove': @@ -63,13 +64,30 @@ function remoteVideos (req, res, next) { return res.type('json').status(204).end() } +// Handle retries on fail +function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) { + utils.transactionRetryer( + function (callback) { + return addRemoteVideo(videoToCreateData, fromPod, callback) + }, + function (err) { + if (err) { + logger.error('Cannot insert the remote video with many retries.', { error: err }) + return finalCallback(err) + } + + return finalCallback() + } + ) +} + function addRemoteVideo (videoToCreateData, fromPod, finalCallback) { - logger.debug('Adding remote video "%s".', videoToCreateData.name) + logger.debug('Adding remote video "%s".', videoToCreateData.remoteId) waterfall([ function startTransaction (callback) { - db.sequelize.transaction().asCallback(function (err, t) { + db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) { return callback(err, t) }) }, @@ -103,6 +121,7 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) { authorId: author.id, duration: videoToCreateData.duration, createdAt: videoToCreateData.createdAt, + // FIXME: updatedAt does not seems to be considered by Sequelize updatedAt: videoToCreateData.updatedAt } @@ -142,7 +161,8 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) { ], function (err, t) { if (err) { - logger.error('Cannot insert the remote video.') + // This is just a debug because we will retry the insert + logger.debug('Cannot insert the remote video.', { error: err }) // Abort transaction? if (t) t.rollback() @@ -157,8 +177,25 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) { }) } +// Handle retries on fail +function updateRemoteVideoRetryWrapper (videoAttributesToUpdate, fromPod, finalCallback) { + utils.transactionRetryer( + function (callback) { + return updateRemoteVideo(videoAttributesToUpdate, fromPod, callback) + }, + function (err) { + if (err) { + logger.error('Cannot update the remote video with many retries.', { error: err }) + return finalCallback(err) + } + + return finalCallback() + } + ) +} + function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) { - logger.debug('Updating remote video "%s".', videoAttributesToUpdate.name) + logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId) waterfall([ @@ -208,7 +245,8 @@ function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) { ], function (err, t) { if (err) { - logger.error('Cannot update the remote video.') + // This is just a debug because we will retry the insert + logger.debug('Cannot update the remote video.', { error: err }) // Abort transaction? if (t) t.rollback() @@ -238,7 +276,7 @@ function reportAbuseRemoteVideo (reportData, fromPod, callback) { if (err || !video) { if (!err) err = new Error('video not found') - logger.error('Cannot load video from host and remote id.', { error: err }) + logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId }) return callback(err) } @@ -260,7 +298,7 @@ function fetchVideo (podHost, remoteId, callback) { if (err || !video) { if (!err) err = new Error('video not found') - logger.error('Cannot load video from host and remote id.', { error: err }) + logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId }) return callback(err) } diff --git a/server/controllers/api/videos.js b/server/controllers/api/videos.js index 6829804ec..4d45c11c0 100644 --- a/server/controllers/api/videos.js +++ b/server/controllers/api/videos.js @@ -70,13 +70,13 @@ router.put('/:id', oAuth.authenticate, reqFiles, validatorsVideos.videosUpdate, - updateVideo + updateVideoRetryWrapper ) router.post('/', oAuth.authenticate, reqFiles, validatorsVideos.videosAdd, - addVideo + addVideoRetryWrapper ) router.get('/:id', validatorsVideos.videosGet, @@ -103,19 +103,37 @@ module.exports = router // --------------------------------------------------------------------------- -function addVideo (req, res, next) { - const videoFile = req.files.videofile[0] +// Wrapper to video add that retry the function if there is a database error +// We need this because we run the transaction in SERIALIZABLE isolation that can fail +function addVideoRetryWrapper (req, res, next) { + utils.transactionRetryer( + function (callback) { + return addVideo(req, res, req.files.videofile[0], callback) + }, + function (err) { + if (err) { + logger.error('Cannot insert the video with many retries.', { error: err }) + return next(err) + } + + // TODO : include Location of the new video -> 201 + return res.type('json').status(204).end() + } + ) +} + +function addVideo (req, res, videoFile, callback) { const videoInfos = req.body waterfall([ - function startTransaction (callback) { - db.sequelize.transaction().asCallback(function (err, t) { - return callback(err, t) + function startTransaction (callbackWaterfall) { + db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) { + return callbackWaterfall(err, t) }) }, - function findOrCreateAuthor (t, callback) { + function findOrCreateAuthor (t, callbackWaterfall) { const user = res.locals.oauth.token.User const name = user.username @@ -124,19 +142,19 @@ function addVideo (req, res, next) { const userId = user.id db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) { - return callback(err, t, authorInstance) + return callbackWaterfall(err, t, authorInstance) }) }, - function findOrCreateTags (t, author, callback) { + function findOrCreateTags (t, author, callbackWaterfall) { const tags = videoInfos.tags db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) { - return callback(err, t, author, tagInstances) + return callbackWaterfall(err, t, author, tagInstances) }) }, - function createVideoObject (t, author, tagInstances, callback) { + function createVideoObject (t, author, tagInstances, callbackWaterfall) { const videoData = { name: videoInfos.name, remoteId: null, @@ -148,74 +166,97 @@ function addVideo (req, res, next) { const video = db.Video.build(videoData) - return callback(null, t, author, tagInstances, video) + return callbackWaterfall(null, t, author, tagInstances, video) }, // Set the videoname the same as the id - function renameVideoFile (t, author, tagInstances, video, callback) { + function renameVideoFile (t, author, tagInstances, video, callbackWaterfall) { const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR const source = path.join(videoDir, videoFile.filename) const destination = path.join(videoDir, video.getVideoFilename()) fs.rename(source, destination, function (err) { - return callback(err, t, author, tagInstances, video) + if (err) return callbackWaterfall(err) + + // This is important in case if there is another attempt + videoFile.filename = video.getVideoFilename() + return callbackWaterfall(null, t, author, tagInstances, video) }) }, - function insertVideoIntoDB (t, author, tagInstances, video, callback) { + function insertVideoIntoDB (t, author, tagInstances, video, callbackWaterfall) { const options = { transaction: t } // Add tags association video.save(options).asCallback(function (err, videoCreated) { - if (err) return callback(err) + if (err) return callbackWaterfall(err) // Do not forget to add Author informations to the created video videoCreated.Author = author - return callback(err, t, tagInstances, videoCreated) + return callbackWaterfall(err, t, tagInstances, videoCreated) }) }, - function associateTagsToVideo (t, tagInstances, video, callback) { + function associateTagsToVideo (t, tagInstances, video, callbackWaterfall) { const options = { transaction: t } video.setTags(tagInstances, options).asCallback(function (err) { video.Tags = tagInstances - return callback(err, t, video) + return callbackWaterfall(err, t, video) }) }, - function sendToFriends (t, video, callback) { + function sendToFriends (t, video, callbackWaterfall) { video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) return callback(err) + if (err) return callbackWaterfall(err) // Now we'll add the video's meta data to our friends - friends.addVideoToFriends(remoteVideo) - - return callback(null, t) + friends.addVideoToFriends(remoteVideo, t, function (err) { + return callbackWaterfall(err, t) + }) }) } ], function andFinally (err, t) { if (err) { - logger.error('Cannot insert the video.') + // This is just a debug because we will retry the insert + logger.debug('Cannot insert the video.', { error: err }) // Abort transaction? if (t) t.rollback() - return next(err) + return callback(err) } // Commit transaction t.commit() - // TODO : include Location of the new video -> 201 - return res.type('json').status(204).end() + logger.info('Video with name %s created.', videoInfos.name) + + return callback(null) }) } -function updateVideo (req, res, next) { +function updateVideoRetryWrapper (req, res, next) { + utils.transactionRetryer( + function (callback) { + return updateVideo(req, res, callback) + }, + function (err) { + if (err) { + logger.error('Cannot update the video with many retries.', { error: err }) + return next(err) + } + + // TODO : include Location of the new video -> 201 + return res.type('json').status(204).end() + } + ) +} + +function updateVideo (req, res, finalCallback) { const videoInstance = res.locals.video const videoInfosToUpdate = req.body @@ -267,26 +308,25 @@ function updateVideo (req, res, next) { const json = videoInstance.toUpdateRemoteJSON() // Now we'll update the video's meta data to our friends - friends.updateVideoToFriends(json) - - return callback(null, t) + friends.updateVideoToFriends(json, t, function (err) { + return callback(err, t) + }) } ], function andFinally (err, t) { if (err) { - logger.error('Cannot insert the video.') + logger.debug('Cannot update the video.', { error: err }) // Abort transaction? if (t) t.rollback() - return next(err) + return finalCallback(err) } // Commit transaction t.commit() - // TODO : include Location of the new video -> 201 - return res.type('json').status(204).end() + return finalCallback(null) }) } diff --git a/server/helpers/utils.js b/server/helpers/utils.js index 9f4b14582..a902850cd 100644 --- a/server/helpers/utils.js +++ b/server/helpers/utils.js @@ -1,6 +1,7 @@ 'use strict' const crypto = require('crypto') +const retry = require('async/retry') const logger = require('./logger') @@ -9,7 +10,8 @@ const utils = { cleanForExit, generateRandomString, isTestInstance, - getFormatedObjects + getFormatedObjects, + transactionRetryer } function badRequest (req, res, next) { @@ -46,6 +48,18 @@ function getFormatedObjects (objects, objectsTotal) { } } +function transactionRetryer (func, callback) { + retry({ + times: 5, + + errorFilter: function (err) { + const willRetry = (err.name === 'SequelizeDatabaseError') + logger.debug('Maybe retrying the transaction function.', { willRetry }) + return willRetry + } + }, func, callback) +} + // --------------------------------------------------------------------------- module.exports = utils diff --git a/server/lib/friends.js b/server/lib/friends.js index 4afb91b8b..3d3d0fdee 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -24,16 +24,33 @@ const friends = { sendOwnedVideosToPod } -function addVideoToFriends (videoData) { - createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, videoData) +function addVideoToFriends (videoData, transaction, callback) { + const options = { + type: 'add', + endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, + data: videoData, + transaction + } + createRequest(options, callback) } -function updateVideoToFriends (videoData) { - createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, videoData) +function updateVideoToFriends (videoData, transaction, callback) { + const options = { + type: 'update', + endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, + data: videoData, + transaction + } + createRequest(options, callback) } function removeVideoToFriends (videoParams) { - createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams) + const options = { + type: 'remove', + endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, + data: videoParams + } + createRequest(options) } function reportAbuseVideoToFriend (reportData, video) { @@ -258,25 +275,35 @@ function makeRequestsToWinningPods (cert, podsList, callback) { } // Wrapper that populate "toIds" argument with all our friends if it is not specified -function createRequest (type, endpoint, data, toIds) { - if (toIds) return _createRequest(type, endpoint, data, toIds) +// { type, endpoint, data, toIds, transaction } +function createRequest (options, callback) { + if (!callback) callback = function () {} + if (options.toIds) return _createRequest(options, callback) // If the "toIds" pods is not specified, we send the request to all our friends - db.Pod.listAllIds(function (err, podIds) { + db.Pod.listAllIds(options.transaction, function (err, podIds) { if (err) { logger.error('Cannot get pod ids', { error: err }) return } - return _createRequest(type, endpoint, data, podIds) + const newOptions = Object.assign(options, { toIds: podIds }) + return _createRequest(newOptions, callback) }) } -function _createRequest (type, endpoint, data, toIds) { +// { type, endpoint, data, toIds, transaction } +function _createRequest (options, callback) { + const type = options.type + const endpoint = options.endpoint + const data = options.data + const toIds = options.toIds + const transaction = options.transaction + const pods = [] // If there are no destination pods abort - if (toIds.length === 0) return + if (toIds.length === 0) return callback(null) toIds.forEach(function (toPod) { pods.push(db.Pod.build({ id: toPod })) @@ -290,17 +317,14 @@ function _createRequest (type, endpoint, data, toIds) { } } - // We run in transaction to keep coherency between Request and RequestToPod tables - db.sequelize.transaction(function (t) { - const dbRequestOptions = { - transaction: t - } + const dbRequestOptions = { + transaction + } - return db.Request.create(createQuery, dbRequestOptions).then(function (request) { - return request.setPods(pods, dbRequestOptions) - }) - }).asCallback(function (err) { - if (err) logger.error('Error in createRequest transaction.', { error: err }) + return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) { + if (err) return callback(err) + + return request.setPods(pods, dbRequestOptions).asCallback(callback) }) } diff --git a/server/models/pod.js b/server/models/pod.js index 83ecd732e..8e7dd1fd8 100644 --- a/server/models/pod.js +++ b/server/models/pod.js @@ -115,11 +115,18 @@ function list (callback) { return this.findAll().asCallback(callback) } -function listAllIds (callback) { +function listAllIds (transaction, callback) { + if (!callback) { + callback = transaction + transaction = null + } + const query = { attributes: [ 'id' ] } + if (transaction) query.transaction = transaction + return this.findAll(query).asCallback(function (err, pods) { if (err) return callback(err) diff --git a/server/models/request.js b/server/models/request.js index bae227c05..1d6038044 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -291,8 +291,8 @@ function listWithLimitAndRandom (limit, callback) { order: [ [ 'id', 'ASC' ] ], - offset: start, - limit: limit, + // offset: start, + // limit: limit, include: [ this.sequelize.models.Pod ] } -- 2.41.0