From 1593c7a9f58ffaea8933f30f683f67c2b155f6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Bouya?= Date: Fri, 20 Apr 2018 20:20:02 +0200 Subject: Store some information to redis --- main.py | 33 +++++++++++++++++---- market.py | 16 +++++++++- requirements.txt | 1 + store.py | 46 +++++++++++++++++++--------- tests/test_main.py | 53 +++++++++++++++++++++++++-------- tests/test_market.py | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_store.py | 47 +++++++++++++++++++++++++++-- 7 files changed, 245 insertions(+), 35 deletions(-) diff --git a/main.py b/main.py index f465d8d..13c2240 100644 --- a/main.py +++ b/main.py @@ -95,13 +95,25 @@ def parse_config(args): del(args.db_password) del(args.db_database) + redis_config = { + "host": args.redis_host, + "port": args.redis_port, + "db": args.redis_database, + } + if redis_config["host"].startswith("/"): + redis_config["unix_socket_path"] = redis_config.pop("host") + del(redis_config["port"]) + del(args.redis_host) + del(args.redis_port) + del(args.redis_database) + report_path = args.report_path if report_path is not None and not \ os.path.exists(report_path): os.makedirs(report_path) - return pg_config + return pg_config, redis_config def parse_args(argv): parser = configargparse.ArgumentParser( @@ -134,6 +146,10 @@ def parse_args(argv): help="Store report to database (default)") parser.add_argument("--no-report-db", action='store_false', dest="report_db", help="Don't store report to database") + parser.add_argument("--report-redis", action='store_true', default=False, dest="report_redis", + help="Store report to redis") + parser.add_argument("--no-report-redis", action='store_false', dest="report_redis", + help="Don't store report to redis (default)") parser.add_argument("--report-path", required=False, help="Where to store the reports (default: absent, don't store)") parser.add_argument("--no-report-path", action='store_const', dest='report_path', const=None, @@ -148,17 +164,24 @@ def parse_args(argv): help="Password access to database (default: cryptoportfolio)") parser.add_argument("--db-database", default="cryptoportfolio", help="Database access to database (default: cryptoportfolio)") + parser.add_argument("--redis-host", default="localhost", + help="Host access to database (default: localhost). Use path for socket") + parser.add_argument("--redis-port", default=6379, + help="Port access to redis (default: 6379)") + parser.add_argument("--redis-database", default=0, + help="Redis database to use (default: 0)") parsed = parser.parse_args(argv) if parsed.action is None: parsed.action = ["sell_all"] return parsed -def process(market_config, market_id, user_id, args, pg_config): +def process(market_config, market_id, user_id, args, pg_config, redis_config): try: market.Market\ .from_config(market_config, args, market_id=market_id, - pg_config=pg_config, user_id=user_id)\ + pg_config=pg_config, redis_config=redis_config, + user_id=user_id)\ .process(args.action, before=args.before, after=args.after) except Exception as e: print("{}: {}".format(e.__class__.__name__, e)) @@ -166,7 +189,7 @@ def process(market_config, market_id, user_id, args, pg_config): def main(argv): args = parse_args(argv) - pg_config = parse_config(args) + pg_config, redis_config = parse_config(args) market.Portfolio.report.set_verbose(not args.quiet) @@ -183,7 +206,7 @@ def main(argv): process_ = process for market_id, market_config, user_id in fetch_markets(pg_config, args.user): - process_(market_config, market_id, user_id, args, pg_config) + process_(market_config, market_id, user_id, args, pg_config, redis_config) if args.parallel: for thread in threads: diff --git a/market.py b/market.py index 4593eb1..ce0c48c 100644 --- a/market.py +++ b/market.py @@ -2,6 +2,7 @@ from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time import psycopg2 +import redis from store import * from cachetools.func import ttl_cache from datetime import datetime @@ -26,7 +27,7 @@ class Market: self.balances = BalanceStore(self) self.processor = Processor(self) - for key in ["user_id", "market_id", "pg_config"]: + for key in ["user_id", "market_id", "pg_config", "redis_config"]: setattr(self, key, kwargs.get(key, None)) self.report.log_market(self.args) @@ -46,6 +47,8 @@ class Market: self.store_file_report(date) if self.pg_config is not None and self.args.report_db: self.store_database_report(date) + if self.redis_config is not None and self.args.report_redis: + self.store_redis_report(date) def store_file_report(self, date): try: @@ -74,6 +77,17 @@ class Market: except Exception as e: print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def store_redis_report(self, date): + try: + conn = redis.Redis(**self.redis_config) + for type_, log in self.report.to_json_redis(): + key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) + conn.set(key, log, ex=31*24*60*60) + key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) + conn.set(key, log) + except Exception as e: + print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: for action in actions: diff --git a/requirements.txt b/requirements.txt index 2451c80..3a4db2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ psycopg2==2.7.4 retry==0.9.2 cachetools==2.0.1 configargparse==0.12.0 +redis==2.10.6 diff --git a/store.py b/store.py index 0c018e0..072d3a2 100644 --- a/store.py +++ b/store.py @@ -17,6 +17,7 @@ class ReportStore: self.print_logs = [] self.logs = [] + self.redis_status = [] self.no_http_dup = no_http_dup self.last_http = None @@ -46,6 +47,10 @@ class ReportStore: self.logs.append(hash_) return hash_ + def add_redis_status(self, hash_): + self.redis_status.append(hash_) + return hash_ + @staticmethod def default_json_serial(obj): if isinstance(obj, (datetime.datetime, datetime.date)): @@ -63,6 +68,13 @@ class ReportStore: json.dumps(log, default=self.default_json_serial, indent=" ") ) + def to_json_redis(self): + for log in (x.copy() for x in self.redis_status): + yield ( + log.pop("type"), + json.dumps(log, default=self.default_json_serial) + ) + def set_verbose(self, verbose_print): self.verbose_print = verbose_print @@ -91,11 +103,14 @@ class ReportStore: for currency, balance in self.market.balances.all.items(): self.print_log("\t{}".format(balance)) - self.add_log({ - "type": "balance", - "tag": tag, - "balances": self.market.balances.as_json() - }) + log = { + "type": "balance", + "tag": tag, + "balances": self.market.balances.as_json() + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_tickers(self, amounts, other_currency, compute_value, type): @@ -107,15 +122,18 @@ class ReportStore: for currency, amount in amounts.items(): values[currency] = amount.as_json()["value"] rates[currency] = amount.rate - self.add_log({ - "type": "tickers", - "compute_value": compute_value, - "balance_type": type, - "currency": other_currency, - "balances": values, - "rates": rates, - "total": sum(amounts.values()).as_json()["value"] - }) + log = { + "type": "tickers", + "compute_value": compute_value, + "balance_type": type, + "currency": other_currency, + "balances": values, + "rates": rates, + "total": sum(amounts.values()).as_json()["value"] + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_dispatch(self, amount, amounts, liquidity, repartition): self.add_log({ diff --git a/tests/test_main.py b/tests/test_main.py index d2f8029..b650870 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -135,16 +135,16 @@ class MainTest(WebMockTestCase): args_mock.after = "after" self.assertEqual("", stdout_mock.getvalue()) - main.process("config", 3, 1, args_mock, "pg_config") + main.process("config", 3, 1, args_mock, "pg_config", "redis_config") market_mock.from_config.assert_has_calls([ - mock.call("config", args_mock, pg_config="pg_config", market_id=3, user_id=1), + mock.call("config", args_mock, pg_config="pg_config", redis_config="redis_config", market_id=3, user_id=1), mock.call().process("action", before="before", after="after"), ]) with self.subTest(exception=True): market_mock.from_config.side_effect = Exception("boo") - main.process(3, "config", 1, args_mock, "pg_config") + main.process(3, "config", 1, args_mock, "pg_config", "redis_config") self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) def test_main(self): @@ -159,7 +159,7 @@ class MainTest(WebMockTestCase): args_mock.user = "user" parse_args.return_value = args_mock - parse_config.return_value = "pg_config" + parse_config.return_value = ["pg_config", "redis_config"] fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] @@ -171,8 +171,8 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ - mock.call("config1", 3, 1, args_mock, "pg_config"), - mock.call("config2", 1, 2, args_mock, "pg_config"), + mock.call("config1", 3, 1, args_mock, "pg_config", "redis_config"), + mock.call("config2", 1, 2, args_mock, "pg_config", "redis_config"), ]) with self.subTest(parallel=True): with mock.patch("main.parse_args") as parse_args,\ @@ -187,7 +187,7 @@ class MainTest(WebMockTestCase): args_mock.user = "user" parse_args.return_value = args_mock - parse_config.return_value = "pg_config" + parse_config.return_value = ["pg_config", "redis_config"] fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] @@ -202,9 +202,9 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ mock.call.__bool__(), - mock.call("config1", 3, 1, args_mock, "pg_config"), + mock.call("config1", 3, 1, args_mock, "pg_config", "redis_config"), mock.call.__bool__(), - mock.call("config2", 1, 2, args_mock, "pg_config"), + mock.call("config2", 1, 2, args_mock, "pg_config", "redis_config"), ]) with self.subTest(quiet=True): with mock.patch("main.parse_args") as parse_args,\ @@ -219,7 +219,7 @@ class MainTest(WebMockTestCase): args_mock.user = "user" parse_args.return_value = args_mock - parse_config.return_value = "pg_config" + parse_config.return_value = ["pg_config", "redis_config"] fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] @@ -240,7 +240,7 @@ class MainTest(WebMockTestCase): args_mock.user = "user" parse_args.return_value = args_mock - parse_config.return_value = "pg_config" + parse_config.return_value = ["pg_config", "redis_config"] fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] @@ -259,15 +259,39 @@ class MainTest(WebMockTestCase): "db_user": "user", "db_password": "password", "db_database": "database", + "redis_host": "rhost", + "redis_port": "rport", + "redis_database": "rdb", "report_path": None, }) - result = main.parse_config(args) + db_config, redis_config = main.parse_config(args) self.assertEqual({ "host": "host", "port": "port", "user": "user", "password": "password", "database": "database" - }, result) + }, db_config) + self.assertEqual({ "host": "rhost", "port": "rport", "db": + "rdb"}, redis_config) + with self.assertRaises(AttributeError): args.db_password + with self.assertRaises(AttributeError): + args.redis_host + + with self.subTest(redis_host="socket"): + args = main.configargparse.Namespace(**{ + "db_host": "host", + "db_port": "port", + "db_user": "user", + "db_password": "password", + "db_database": "database", + "redis_host": "/run/foo", + "redis_port": "rport", + "redis_database": "rdb", + "report_path": None, + }) + + db_config, redis_config = main.parse_config(args) + self.assertEqual({ "unix_socket_path": "/run/foo", "db": "rdb"}, redis_config) with self.subTest(report_path="present"): args = main.configargparse.Namespace(**{ @@ -276,6 +300,9 @@ class MainTest(WebMockTestCase): "db_user": "user", "db_password": "password", "db_database": "database", + "redis_host": "rhost", + "redis_port": "rport", + "redis_database": "rdb", "report_path": "report_path", }) diff --git a/tests/test_market.py b/tests/test_market.py index b41cd6a..e3482b8 100644 --- a/tests/test_market.py +++ b/tests/test_market.py @@ -530,23 +530,55 @@ class MarketTest(WebMockTestCase): m.store_database_report(datetime.datetime(2018, 3, 24)) self.assertEqual(stdout_mock.getvalue(), "impossible to store report to database: Exception; Bouh\n") + @mock.patch.object(market, "redis") + def test_store_redis_report(self, redis): + connect_mock = mock.Mock() + redis.Redis.return_value = connect_mock + + m = market.Market(self.ccxt, self.market_args(), + redis_config={"config": "redis_config"}, market_id=1) + + with self.subTest(error=False),\ + mock.patch.object(m, "report") as report: + report.to_json_redis.return_value = [ + ("type1", "payload1"), + ("type2", "payload2"), + ] + m.store_redis_report(datetime.datetime(2018, 3, 24)) + connect_mock.assert_has_calls([ + mock.call.set("/cryptoportfolio/1/2018-03-24T00:00:00/type1", "payload1", ex=31*24*60*60), + mock.call.set("/cryptoportfolio/1/latest/type1", "payload1"), + mock.call.set("/cryptoportfolio/1/2018-03-24T00:00:00/type2", "payload2", ex=31*24*60*60), + mock.call.set("/cryptoportfolio/1/latest/type2", "payload2"), + ]) + + connect_mock.reset_mock() + with self.subTest(error=True),\ + mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock: + redis.Redis.side_effect = Exception("Bouh") + m.store_redis_report(datetime.datetime(2018, 3, 24)) + self.assertEqual(stdout_mock.getvalue(), "impossible to store report to redis: Exception; Bouh\n") + def test_store_report(self): m = market.Market(self.ccxt, self.market_args(report_db=False), user_id=1) with self.subTest(file=None, pg_config=None),\ mock.patch.object(m, "report") as report,\ mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ mock.patch.object(m, "store_file_report") as file_report: m.store_report() report.merge.assert_called_with(store.Portfolio.report) file_report.assert_not_called() db_report.assert_not_called() + redis_report.assert_not_called() report.reset_mock() m = market.Market(self.ccxt, self.market_args(report_db=False, report_path="present"), user_id=1) with self.subTest(file="present", pg_config=None),\ mock.patch.object(m, "report") as report,\ mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ mock.patch.object(m, "store_database_report") as db_report,\ mock.patch.object(market.datetime, "datetime") as time_mock: @@ -557,12 +589,14 @@ class MarketTest(WebMockTestCase): report.merge.assert_called_with(store.Portfolio.report) file_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) db_report.assert_not_called() + redis_report.assert_not_called() report.reset_mock() m = market.Market(self.ccxt, self.market_args(report_db=True, report_path="present"), user_id=1) with self.subTest(file="present", pg_config=None, report_db=True),\ mock.patch.object(m, "report") as report,\ mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ mock.patch.object(m, "store_database_report") as db_report,\ mock.patch.object(market.datetime, "datetime") as time_mock: @@ -573,12 +607,14 @@ class MarketTest(WebMockTestCase): report.merge.assert_called_with(store.Portfolio.report) file_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) db_report.assert_not_called() + redis_report.assert_not_called() report.reset_mock() m = market.Market(self.ccxt, self.market_args(report_db=True), pg_config="present", user_id=1) with self.subTest(file=None, pg_config="present"),\ mock.patch.object(m, "report") as report,\ mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ mock.patch.object(m, "store_database_report") as db_report,\ mock.patch.object(market.datetime, "datetime") as time_mock: @@ -589,6 +625,7 @@ class MarketTest(WebMockTestCase): report.merge.assert_called_with(store.Portfolio.report) file_report.assert_not_called() db_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) + redis_report.assert_not_called() report.reset_mock() m = market.Market(self.ccxt, self.market_args(report_db=True, report_path="present"), @@ -596,6 +633,7 @@ class MarketTest(WebMockTestCase): with self.subTest(file="present", pg_config="present"),\ mock.patch.object(m, "report") as report,\ mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ mock.patch.object(m, "store_database_report") as db_report,\ mock.patch.object(market.datetime, "datetime") as time_mock: @@ -606,6 +644,52 @@ class MarketTest(WebMockTestCase): report.merge.assert_called_with(store.Portfolio.report) file_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) db_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) + redis_report.assert_not_called() + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(report_redis=False), + redis_config="redis_config", user_id=1) + with self.subTest(redis_config="present", report_redis=False),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market.datetime, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + + m.store_report() + redis_report.assert_not_called() + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(report_redis=True), + user_id=1) + with self.subTest(redis_config="absent", report_redis=True),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market.datetime, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + + m.store_report() + redis_report.assert_not_called() + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(report_redis=True), + redis_config="redis_config", user_id=1) + with self.subTest(redis_config="present", report_redis=True),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_redis_report") as redis_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market.datetime, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + + m.store_report() + redis_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) def test_print_tickers(self): m = market.Market(self.ccxt, self.market_args()) diff --git a/tests/test_store.py b/tests/test_store.py index ffd2645..df113b7 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -459,6 +459,13 @@ class ReportStoreTest(WebMockTestCase): self.assertEqual({"foo": "bar", "date": mock.ANY, "user_id": None, "market_id": None}, result) + def test_add_redis_status(self): + report_store = market.ReportStore(self.m) + result = report_store.add_redis_status({"foo": "bar"}) + + self.assertEqual({"foo": "bar"}, result) + self.assertEqual(result, report_store.redis_status[0]) + def test_set_verbose(self): report_store = market.ReportStore(self.m) with self.subTest(verbose=True): @@ -534,6 +541,20 @@ class ReportStoreTest(WebMockTestCase): self.assertEqual(("date1", "type1", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[0]) self.assertEqual(("date2", "type2", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[1]) + def test_to_json_redis(self): + report_store = market.ReportStore(self.m) + report_store.redis_status.append({ + "type": "type1", "foo": "bar", "bla": "bla" + }) + report_store.redis_status.append({ + "type": "type2", "foo": "bar", "bla": "bla" + }) + logs = list(report_store.to_json_redis()) + + self.assertEqual(2, len(logs)) + self.assertEqual(("type1", '{"foo": "bar", "bla": "bla"}'), logs[0]) + self.assertEqual(("type2", '{"foo": "bar", "bla": "bla"}'), logs[1]) + @mock.patch.object(market.ReportStore, "print_log") @mock.patch.object(market.ReportStore, "add_log") def test_log_stage(self, add_log, print_log): @@ -559,7 +580,8 @@ class ReportStoreTest(WebMockTestCase): @mock.patch.object(market.ReportStore, "print_log") @mock.patch.object(market.ReportStore, "add_log") - def test_log_balances(self, add_log, print_log): + @mock.patch.object(market.ReportStore, "add_redis_status") + def test_log_balances(self, add_redis_status, add_log, print_log): report_store = market.ReportStore(self.m) self.m.balances.as_json.return_value = "json" self.m.balances.all = { "FOO": "bar", "BAR": "baz" } @@ -575,10 +597,16 @@ class ReportStoreTest(WebMockTestCase): 'balances': 'json', 'tag': 'tag' }) + add_redis_status.assert_called_once_with({ + 'type': 'balance', + 'balances': 'json', + 'tag': 'tag' + }) @mock.patch.object(market.ReportStore, "print_log") @mock.patch.object(market.ReportStore, "add_log") - def test_log_tickers(self, add_log, print_log): + @mock.patch.object(market.ReportStore, "add_redis_status") + def test_log_tickers(self, add_redis_status, add_log, print_log): report_store = market.ReportStore(self.m) amounts = { "BTC": portfolio.Amount("BTC", 10), @@ -603,6 +631,21 @@ class ReportStoreTest(WebMockTestCase): }, 'total': D('10.3') }) + add_redis_status.assert_called_once_with({ + 'type': 'tickers', + 'compute_value': 'default', + 'balance_type': 'total', + 'currency': 'BTC', + 'balances': { + 'BTC': D('10'), + 'ETH': D('0.3') + }, + 'rates': { + 'BTC': None, + 'ETH': D('0.1') + }, + 'total': D('10.3') + }) add_log.reset_mock() compute_value = lambda x: x["bid"] -- cgit v1.2.3