del(args.db_password)
del(args.db_database)
+ redis_config = {
+ "host": args.redis_host,
+ "port": args.redis_port,
+ "db": args.redis_database,
+ }
+ if redis_config["host"].startswith("/"):
+ redis_config["unix_socket_path"] = redis_config.pop("host")
+ del(redis_config["port"])
+ del(args.redis_host)
+ del(args.redis_port)
+ del(args.redis_database)
+
report_path = args.report_path
if report_path is not None and not \
os.path.exists(report_path):
os.makedirs(report_path)
- return pg_config
+ return pg_config, redis_config
def parse_args(argv):
parser = configargparse.ArgumentParser(
help="Store report to database (default)")
parser.add_argument("--no-report-db", action='store_false', dest="report_db",
help="Don't store report to database")
+ parser.add_argument("--report-redis", action='store_true', default=False, dest="report_redis",
+ help="Store report to redis")
+ parser.add_argument("--no-report-redis", action='store_false', dest="report_redis",
+ help="Don't store report to redis (default)")
parser.add_argument("--report-path", required=False,
help="Where to store the reports (default: absent, don't store)")
parser.add_argument("--no-report-path", action='store_const', dest='report_path', const=None,
help="Password access to database (default: cryptoportfolio)")
parser.add_argument("--db-database", default="cryptoportfolio",
help="Database access to database (default: cryptoportfolio)")
+ parser.add_argument("--redis-host", default="localhost",
+ help="Host access to database (default: localhost). Use path for socket")
+ parser.add_argument("--redis-port", default=6379,
+ help="Port access to redis (default: 6379)")
+ parser.add_argument("--redis-database", default=0,
+ help="Redis database to use (default: 0)")
parsed = parser.parse_args(argv)
if parsed.action is None:
parsed.action = ["sell_all"]
return parsed
-def process(market_config, market_id, user_id, args, pg_config):
+def process(market_config, market_id, user_id, args, pg_config, redis_config):
try:
market.Market\
.from_config(market_config, args, market_id=market_id,
- pg_config=pg_config, user_id=user_id)\
+ pg_config=pg_config, redis_config=redis_config,
+ user_id=user_id)\
.process(args.action, before=args.before, after=args.after)
except Exception as e:
print("{}: {}".format(e.__class__.__name__, e))
def main(argv):
args = parse_args(argv)
- pg_config = parse_config(args)
+ pg_config, redis_config = parse_config(args)
market.Portfolio.report.set_verbose(not args.quiet)
process_ = process
for market_id, market_config, user_id in fetch_markets(pg_config, args.user):
- process_(market_config, market_id, user_id, args, pg_config)
+ process_(market_config, market_id, user_id, args, pg_config, redis_config)
if args.parallel:
for thread in threads:
import ccxt_wrapper as ccxt
import time
import psycopg2
+import redis
from store import *
from cachetools.func import ttl_cache
from datetime import datetime
self.balances = BalanceStore(self)
self.processor = Processor(self)
- for key in ["user_id", "market_id", "pg_config"]:
+ for key in ["user_id", "market_id", "pg_config", "redis_config"]:
setattr(self, key, kwargs.get(key, None))
self.report.log_market(self.args)
self.store_file_report(date)
if self.pg_config is not None and self.args.report_db:
self.store_database_report(date)
+ if self.redis_config is not None and self.args.report_redis:
+ self.store_redis_report(date)
def store_file_report(self, date):
try:
except Exception as e:
print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
+ def store_redis_report(self, date):
+ try:
+ conn = redis.Redis(**self.redis_config)
+ for type_, log in self.report.to_json_redis():
+ key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_)
+ conn.set(key, log, ex=31*24*60*60)
+ key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_)
+ conn.set(key, log)
+ except Exception as e:
+ print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e))
+
def process(self, actions, before=False, after=False):
try:
for action in actions:
retry==0.9.2
cachetools==2.0.1
configargparse==0.12.0
+redis==2.10.6
self.print_logs = []
self.logs = []
+ self.redis_status = []
self.no_http_dup = no_http_dup
self.last_http = None
self.logs.append(hash_)
return hash_
+ def add_redis_status(self, hash_):
+ self.redis_status.append(hash_)
+ return hash_
+
@staticmethod
def default_json_serial(obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
json.dumps(log, default=self.default_json_serial, indent=" ")
)
+ def to_json_redis(self):
+ for log in (x.copy() for x in self.redis_status):
+ yield (
+ log.pop("type"),
+ json.dumps(log, default=self.default_json_serial)
+ )
+
def set_verbose(self, verbose_print):
self.verbose_print = verbose_print
for currency, balance in self.market.balances.all.items():
self.print_log("\t{}".format(balance))
- self.add_log({
- "type": "balance",
- "tag": tag,
- "balances": self.market.balances.as_json()
- })
+ log = {
+ "type": "balance",
+ "tag": tag,
+ "balances": self.market.balances.as_json()
+ }
+
+ self.add_log(log.copy())
+ self.add_redis_status(log)
def log_tickers(self, amounts, other_currency,
compute_value, type):
for currency, amount in amounts.items():
values[currency] = amount.as_json()["value"]
rates[currency] = amount.rate
- self.add_log({
- "type": "tickers",
- "compute_value": compute_value,
- "balance_type": type,
- "currency": other_currency,
- "balances": values,
- "rates": rates,
- "total": sum(amounts.values()).as_json()["value"]
- })
+ log = {
+ "type": "tickers",
+ "compute_value": compute_value,
+ "balance_type": type,
+ "currency": other_currency,
+ "balances": values,
+ "rates": rates,
+ "total": sum(amounts.values()).as_json()["value"]
+ }
+
+ self.add_log(log.copy())
+ self.add_redis_status(log)
def log_dispatch(self, amount, amounts, liquidity, repartition):
self.add_log({
args_mock.after = "after"
self.assertEqual("", stdout_mock.getvalue())
- main.process("config", 3, 1, args_mock, "pg_config")
+ main.process("config", 3, 1, args_mock, "pg_config", "redis_config")
market_mock.from_config.assert_has_calls([
- mock.call("config", args_mock, pg_config="pg_config", market_id=3, user_id=1),
+ mock.call("config", args_mock, pg_config="pg_config", redis_config="redis_config", market_id=3, user_id=1),
mock.call().process("action", before="before", after="after"),
])
with self.subTest(exception=True):
market_mock.from_config.side_effect = Exception("boo")
- main.process(3, "config", 1, args_mock, "pg_config")
+ main.process(3, "config", 1, args_mock, "pg_config", "redis_config")
self.assertEqual("Exception: boo\n", stdout_mock.getvalue())
def test_main(self):
args_mock.user = "user"
parse_args.return_value = args_mock
- parse_config.return_value = "pg_config"
+ parse_config.return_value = ["pg_config", "redis_config"]
fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
self.assertEqual(2, process.call_count)
process.assert_has_calls([
- mock.call("config1", 3, 1, args_mock, "pg_config"),
- mock.call("config2", 1, 2, args_mock, "pg_config"),
+ mock.call("config1", 3, 1, args_mock, "pg_config", "redis_config"),
+ mock.call("config2", 1, 2, args_mock, "pg_config", "redis_config"),
])
with self.subTest(parallel=True):
with mock.patch("main.parse_args") as parse_args,\
args_mock.user = "user"
parse_args.return_value = args_mock
- parse_config.return_value = "pg_config"
+ parse_config.return_value = ["pg_config", "redis_config"]
fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
self.assertEqual(2, process.call_count)
process.assert_has_calls([
mock.call.__bool__(),
- mock.call("config1", 3, 1, args_mock, "pg_config"),
+ mock.call("config1", 3, 1, args_mock, "pg_config", "redis_config"),
mock.call.__bool__(),
- mock.call("config2", 1, 2, args_mock, "pg_config"),
+ mock.call("config2", 1, 2, args_mock, "pg_config", "redis_config"),
])
with self.subTest(quiet=True):
with mock.patch("main.parse_args") as parse_args,\
args_mock.user = "user"
parse_args.return_value = args_mock
- parse_config.return_value = "pg_config"
+ parse_config.return_value = ["pg_config", "redis_config"]
fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
args_mock.user = "user"
parse_args.return_value = args_mock
- parse_config.return_value = "pg_config"
+ parse_config.return_value = ["pg_config", "redis_config"]
fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
"db_user": "user",
"db_password": "password",
"db_database": "database",
+ "redis_host": "rhost",
+ "redis_port": "rport",
+ "redis_database": "rdb",
"report_path": None,
})
- result = main.parse_config(args)
+ db_config, redis_config = main.parse_config(args)
self.assertEqual({ "host": "host", "port": "port", "user":
"user", "password": "password", "database": "database"
- }, result)
+ }, db_config)
+ self.assertEqual({ "host": "rhost", "port": "rport", "db":
+ "rdb"}, redis_config)
+
with self.assertRaises(AttributeError):
args.db_password
+ with self.assertRaises(AttributeError):
+ args.redis_host
+
+ with self.subTest(redis_host="socket"):
+ args = main.configargparse.Namespace(**{
+ "db_host": "host",
+ "db_port": "port",
+ "db_user": "user",
+ "db_password": "password",
+ "db_database": "database",
+ "redis_host": "/run/foo",
+ "redis_port": "rport",
+ "redis_database": "rdb",
+ "report_path": None,
+ })
+
+ db_config, redis_config = main.parse_config(args)
+ self.assertEqual({ "unix_socket_path": "/run/foo", "db": "rdb"}, redis_config)
with self.subTest(report_path="present"):
args = main.configargparse.Namespace(**{
"db_user": "user",
"db_password": "password",
"db_database": "database",
+ "redis_host": "rhost",
+ "redis_port": "rport",
+ "redis_database": "rdb",
"report_path": "report_path",
})
m.store_database_report(datetime.datetime(2018, 3, 24))
self.assertEqual(stdout_mock.getvalue(), "impossible to store report to database: Exception; Bouh\n")
+ @mock.patch.object(market, "redis")
+ def test_store_redis_report(self, redis):
+ connect_mock = mock.Mock()
+ redis.Redis.return_value = connect_mock
+
+ m = market.Market(self.ccxt, self.market_args(),
+ redis_config={"config": "redis_config"}, market_id=1)
+
+ with self.subTest(error=False),\
+ mock.patch.object(m, "report") as report:
+ report.to_json_redis.return_value = [
+ ("type1", "payload1"),
+ ("type2", "payload2"),
+ ]
+ m.store_redis_report(datetime.datetime(2018, 3, 24))
+ connect_mock.assert_has_calls([
+ mock.call.set("/cryptoportfolio/1/2018-03-24T00:00:00/type1", "payload1", ex=31*24*60*60),
+ mock.call.set("/cryptoportfolio/1/latest/type1", "payload1"),
+ mock.call.set("/cryptoportfolio/1/2018-03-24T00:00:00/type2", "payload2", ex=31*24*60*60),
+ mock.call.set("/cryptoportfolio/1/latest/type2", "payload2"),
+ ])
+
+ connect_mock.reset_mock()
+ with self.subTest(error=True),\
+ mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock:
+ redis.Redis.side_effect = Exception("Bouh")
+ m.store_redis_report(datetime.datetime(2018, 3, 24))
+ self.assertEqual(stdout_mock.getvalue(), "impossible to store report to redis: Exception; Bouh\n")
+
def test_store_report(self):
m = market.Market(self.ccxt, self.market_args(report_db=False), user_id=1)
with self.subTest(file=None, pg_config=None),\
mock.patch.object(m, "report") as report,\
mock.patch.object(m, "store_database_report") as db_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
mock.patch.object(m, "store_file_report") as file_report:
m.store_report()
report.merge.assert_called_with(store.Portfolio.report)
file_report.assert_not_called()
db_report.assert_not_called()
+ redis_report.assert_not_called()
report.reset_mock()
m = market.Market(self.ccxt, self.market_args(report_db=False, report_path="present"), user_id=1)
with self.subTest(file="present", pg_config=None),\
mock.patch.object(m, "report") as report,\
mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
mock.patch.object(m, "store_database_report") as db_report,\
mock.patch.object(market.datetime, "datetime") as time_mock:
report.merge.assert_called_with(store.Portfolio.report)
file_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
db_report.assert_not_called()
+ redis_report.assert_not_called()
report.reset_mock()
m = market.Market(self.ccxt, self.market_args(report_db=True, report_path="present"), user_id=1)
with self.subTest(file="present", pg_config=None, report_db=True),\
mock.patch.object(m, "report") as report,\
mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
mock.patch.object(m, "store_database_report") as db_report,\
mock.patch.object(market.datetime, "datetime") as time_mock:
report.merge.assert_called_with(store.Portfolio.report)
file_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
db_report.assert_not_called()
+ redis_report.assert_not_called()
report.reset_mock()
m = market.Market(self.ccxt, self.market_args(report_db=True), pg_config="present", user_id=1)
with self.subTest(file=None, pg_config="present"),\
mock.patch.object(m, "report") as report,\
mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
mock.patch.object(m, "store_database_report") as db_report,\
mock.patch.object(market.datetime, "datetime") as time_mock:
report.merge.assert_called_with(store.Portfolio.report)
file_report.assert_not_called()
db_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
+ redis_report.assert_not_called()
report.reset_mock()
m = market.Market(self.ccxt, self.market_args(report_db=True, report_path="present"),
with self.subTest(file="present", pg_config="present"),\
mock.patch.object(m, "report") as report,\
mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
mock.patch.object(m, "store_database_report") as db_report,\
mock.patch.object(market.datetime, "datetime") as time_mock:
report.merge.assert_called_with(store.Portfolio.report)
file_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
db_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
+ redis_report.assert_not_called()
+
+ report.reset_mock()
+ m = market.Market(self.ccxt, self.market_args(report_redis=False),
+ redis_config="redis_config", user_id=1)
+ with self.subTest(redis_config="present", report_redis=False),\
+ mock.patch.object(m, "report") as report,\
+ mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
+ mock.patch.object(m, "store_database_report") as db_report,\
+ mock.patch.object(market.datetime, "datetime") as time_mock:
+
+ time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
+ m.store_report()
+ redis_report.assert_not_called()
+
+ report.reset_mock()
+ m = market.Market(self.ccxt, self.market_args(report_redis=True),
+ user_id=1)
+ with self.subTest(redis_config="absent", report_redis=True),\
+ mock.patch.object(m, "report") as report,\
+ mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
+ mock.patch.object(m, "store_database_report") as db_report,\
+ mock.patch.object(market.datetime, "datetime") as time_mock:
+
+ time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
+ m.store_report()
+ redis_report.assert_not_called()
+
+ report.reset_mock()
+ m = market.Market(self.ccxt, self.market_args(report_redis=True),
+ redis_config="redis_config", user_id=1)
+ with self.subTest(redis_config="present", report_redis=True),\
+ mock.patch.object(m, "report") as report,\
+ mock.patch.object(m, "store_file_report") as file_report,\
+ mock.patch.object(m, "store_redis_report") as redis_report,\
+ mock.patch.object(m, "store_database_report") as db_report,\
+ mock.patch.object(market.datetime, "datetime") as time_mock:
+
+ time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
+ m.store_report()
+ redis_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
def test_print_tickers(self):
m = market.Market(self.ccxt, self.market_args())
self.assertEqual({"foo": "bar", "date": mock.ANY, "user_id": None, "market_id": None}, result)
+ def test_add_redis_status(self):
+ report_store = market.ReportStore(self.m)
+ result = report_store.add_redis_status({"foo": "bar"})
+
+ self.assertEqual({"foo": "bar"}, result)
+ self.assertEqual(result, report_store.redis_status[0])
+
def test_set_verbose(self):
report_store = market.ReportStore(self.m)
with self.subTest(verbose=True):
self.assertEqual(("date1", "type1", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[0])
self.assertEqual(("date2", "type2", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[1])
+ def test_to_json_redis(self):
+ report_store = market.ReportStore(self.m)
+ report_store.redis_status.append({
+ "type": "type1", "foo": "bar", "bla": "bla"
+ })
+ report_store.redis_status.append({
+ "type": "type2", "foo": "bar", "bla": "bla"
+ })
+ logs = list(report_store.to_json_redis())
+
+ self.assertEqual(2, len(logs))
+ self.assertEqual(("type1", '{"foo": "bar", "bla": "bla"}'), logs[0])
+ self.assertEqual(("type2", '{"foo": "bar", "bla": "bla"}'), logs[1])
+
@mock.patch.object(market.ReportStore, "print_log")
@mock.patch.object(market.ReportStore, "add_log")
def test_log_stage(self, add_log, print_log):
@mock.patch.object(market.ReportStore, "print_log")
@mock.patch.object(market.ReportStore, "add_log")
- def test_log_balances(self, add_log, print_log):
+ @mock.patch.object(market.ReportStore, "add_redis_status")
+ def test_log_balances(self, add_redis_status, add_log, print_log):
report_store = market.ReportStore(self.m)
self.m.balances.as_json.return_value = "json"
self.m.balances.all = { "FOO": "bar", "BAR": "baz" }
'balances': 'json',
'tag': 'tag'
})
+ add_redis_status.assert_called_once_with({
+ 'type': 'balance',
+ 'balances': 'json',
+ 'tag': 'tag'
+ })
@mock.patch.object(market.ReportStore, "print_log")
@mock.patch.object(market.ReportStore, "add_log")
- def test_log_tickers(self, add_log, print_log):
+ @mock.patch.object(market.ReportStore, "add_redis_status")
+ def test_log_tickers(self, add_redis_status, add_log, print_log):
report_store = market.ReportStore(self.m)
amounts = {
"BTC": portfolio.Amount("BTC", 10),
},
'total': D('10.3')
})
+ add_redis_status.assert_called_once_with({
+ 'type': 'tickers',
+ 'compute_value': 'default',
+ 'balance_type': 'total',
+ 'currency': 'BTC',
+ 'balances': {
+ 'BTC': D('10'),
+ 'ETH': D('0.3')
+ },
+ 'rates': {
+ 'BTC': None,
+ 'ETH': D('0.1')
+ },
+ 'total': D('10.3')
+ })
add_log.reset_mock()
compute_value = lambda x: x["bid"]