From dc1ca9a306f09886c6c57f8d426c59a9d084b2b3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Fri, 9 Mar 2018 15:37:10 +0100 Subject: [PATCH 01/16] Add parallelization --- main.py | 13 +- store.py | 128 ++++++++++++++--- test.py | 418 +++++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 431 insertions(+), 128 deletions(-) diff --git a/main.py b/main.py index 37f485d..856d449 100644 --- a/main.py +++ b/main.py @@ -117,6 +117,8 @@ def parse_args(argv): parser.add_argument("--action", action='append', help="Do a different action than trading (add several times to chain)") + parser.add_argument("--parallel", action='store_true', default=True, dest="parallel") + parser.add_argument("--no-parallel", action='store_false', dest="parallel") args = parser.parse_args(argv) @@ -139,8 +141,15 @@ def main(argv): pg_config, report_path = parse_config(args.config) - for market_config, user_id in fetch_markets(pg_config, args.user): - process(market_config, user_id, report_path, args) + if args.parallel: + import threading + market.Portfolio.start_worker() + + for market_config, user_id in fetch_markets(pg_config, args.user): + threading.Thread(target=process, args=[market_config, user_id, report_path, args]).start() + else: + for market_config, user_id in fetch_markets(pg_config, args.user): + process(market_config, user_id, report_path, args) if __name__ == '__main__': # pragma: no cover main(sys.argv[1:]) diff --git a/store.py b/store.py index c8cdc42..f655be5 100644 --- a/store.py +++ b/store.py @@ -309,30 +309,110 @@ class TradeStore: for order in self.all_orders(state="open"): order.get_status() +class NoopLock: + def __enter__(self, *args): + pass + def __exit__(self, *args): + pass + +class LockedVar: + def __init__(self, value): + self.lock = NoopLock() + self.val = value + + def start_lock(self): + import threading + self.lock = threading.Lock() + + def set(self, value): + with self.lock: + self.val = value + + def get(self, key=None): + with self.lock: + if key is not None and isinstance(self.val, dict): + return self.val.get(key) + else: + return self.val + + def __getattr__(self, key): + with self.lock: + return getattr(self.val, key) + class Portfolio: URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json" - liquidities = {} - data = None - last_date = None - report = ReportStore(None) + data = LockedVar(None) + liquidities = LockedVar({}) + last_date = LockedVar(None) + report = LockedVar(ReportStore(None)) + worker = None + worker_started = False + worker_notify = None + callback = None + + @classmethod + def start_worker(cls, poll=30): + import threading + + cls.worker = threading.Thread(name="portfolio", daemon=True, + target=cls.wait_for_notification, kwargs={"poll": poll}) + cls.worker_notify = threading.Event() + cls.callback = threading.Event() + + cls.last_date.start_lock() + cls.liquidities.start_lock() + cls.report.start_lock() + + cls.worker_started = True + cls.worker.start() + + @classmethod + def is_worker_thread(cls): + if cls.worker is None: + return False + else: + import threading + return cls.worker == threading.current_thread() + + @classmethod + def wait_for_notification(cls, poll=30): + if not cls.is_worker_thread(): + raise RuntimeError("This method needs to be ran with the worker") + while cls.worker_started: + cls.worker_notify.wait() + cls.worker_notify.clear() + cls.report.print_log("Fetching cryptoportfolio") + cls.get_cryptoportfolio(refetch=True) + cls.callback.set() + time.sleep(poll) @classmethod - def wait_for_recent(cls, delta=4): + def notify_and_wait(cls): + cls.callback.clear() + cls.worker_notify.set() + cls.callback.wait() + + @classmethod + def wait_for_recent(cls, delta=4, poll=30): cls.get_cryptoportfolio() - while cls.last_date is None or datetime.now() - cls.last_date > timedelta(delta): - time.sleep(30) - cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") + while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta): + if cls.worker is None: + time.sleep(poll) + cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") cls.get_cryptoportfolio(refetch=True) @classmethod def repartition(cls, liquidity="medium"): cls.get_cryptoportfolio() - liquidities = cls.liquidities[liquidity] - return liquidities[cls.last_date] + liquidities = cls.liquidities.get(liquidity) + return liquidities[cls.last_date.get()] @classmethod def get_cryptoportfolio(cls, refetch=False): - if cls.data is not None and not refetch: + if cls.data.get() is not None and not refetch: + return + if cls.worker is not None and not cls.is_worker_thread(): + cls.notify_and_wait() return try: r = requests.get(cls.URL) @@ -342,11 +422,12 @@ class Portfolio: cls.report.log_error("get_cryptoportfolio", exception=e) return try: - cls.data = r.json(parse_int=D, parse_float=D) + cls.data.set(r.json(parse_int=D, parse_float=D)) cls.parse_cryptoportfolio() except (JSONDecodeError, SimpleJSONDecodeError): - cls.data = None - cls.liquidities = {} + cls.data.set(None) + cls.last_date.set(None) + cls.liquidities.set({}) @classmethod def parse_cryptoportfolio(cls): @@ -366,6 +447,8 @@ class Portfolio: return clean_weights_ def parse_weights(portfolio_hash): + if "weights" not in portfolio_hash: + return {} weights_hash = portfolio_hash["weights"] weights = {} for i in range(len(weights_hash["_row"])): @@ -375,13 +458,16 @@ class Portfolio: map(clean_weights(i), weights_hash.items()))) return weights - high_liquidity = parse_weights(cls.data["portfolio_1"]) - medium_liquidity = parse_weights(cls.data["portfolio_2"]) + high_liquidity = parse_weights(cls.data.get("portfolio_1")) + medium_liquidity = parse_weights(cls.data.get("portfolio_2")) - cls.liquidities = { - "medium": medium_liquidity, - "high": high_liquidity, - } - cls.last_date = max(max(medium_liquidity.keys()), max(high_liquidity.keys())) + cls.liquidities.set({ + "medium": medium_liquidity, + "high": high_liquidity, + }) + cls.last_date.set(max( + max(medium_liquidity.keys(), default=datetime(1, 1, 1)), + max(high_liquidity.keys(), default=datetime(1, 1, 1)) + )) diff --git a/test.py b/test.py index c0e6a8a..f61e739 100644 --- a/test.py +++ b/test.py @@ -7,6 +7,7 @@ from unittest import mock import requests import requests_mock from io import StringIO +import threading import portfolio, market, main, store limits = ["acceptance", "unit"] @@ -33,10 +34,14 @@ class WebMockTestCase(unittest.TestCase): self.patchers = [ mock.patch.multiple(market.Portfolio, - last_date=None, - data=None, - liquidities={}, - report=mock.Mock()), + data=store.LockedVar(None), + liquidities=store.LockedVar({}), + last_date=store.LockedVar(None), + report=mock.Mock(), + worker=None, + worker_notify=None, + worker_started=False, + callback=None), mock.patch.multiple(portfolio.Computation, computations=portfolio.Computation.computations), ] @@ -441,6 +446,99 @@ class poloniexETest(unittest.TestCase): create_order.assert_called_once_with("symbol", "type", "side", "amount", price="price", params="params") +@unittest.skipUnless("unit" in limits, "Unit skipped") +class NoopLockTest(unittest.TestCase): + def test_with(self): + noop_lock = store.NoopLock() + with noop_lock: + self.assertTrue(True) + +@unittest.skipUnless("unit" in limits, "Unit skipped") +class LockedVar(unittest.TestCase): + + def test_values(self): + locked_var = store.LockedVar("Foo") + self.assertIsInstance(locked_var.lock, store.NoopLock) + self.assertEqual("Foo", locked_var.val) + + def test_get(self): + with self.subTest(desc="Normal case"): + locked_var = store.LockedVar("Foo") + self.assertEqual("Foo", locked_var.get()) + with self.subTest(desc="Dict"): + locked_var = store.LockedVar({"foo": "bar"}) + self.assertEqual({"foo": "bar"}, locked_var.get()) + self.assertEqual("bar", locked_var.get("foo")) + self.assertIsNone(locked_var.get("other")) + + def test_set(self): + locked_var = store.LockedVar("Foo") + locked_var.set("Bar") + self.assertEqual("Bar", locked_var.get()) + + def test__getattr(self): + dummy = type('Dummy', (object,), {})() + dummy.attribute = "Hey" + + locked_var = store.LockedVar(dummy) + self.assertEqual("Hey", locked_var.attribute) + with self.assertRaises(AttributeError): + locked_var.other + + def test_start_lock(self): + locked_var = store.LockedVar("Foo") + locked_var.start_lock() + self.assertEqual("lock", locked_var.lock.__class__.__name__) + + thread1 = threading.Thread(target=locked_var.set, args=["Bar1"]) + thread2 = threading.Thread(target=locked_var.set, args=["Bar2"]) + thread3 = threading.Thread(target=locked_var.set, args=["Bar3"]) + + with locked_var.lock: + thread1.start() + thread2.start() + thread3.start() + + self.assertEqual("Foo", locked_var.val) + thread1.join() + thread2.join() + thread3.join() + self.assertEqual("Bar", locked_var.get()[0:3]) + + def test_wait_for_notification(self): + with self.assertRaises(RuntimeError): + store.Portfolio.wait_for_notification() + + with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ + mock.patch.object(store.Portfolio, "report") as report,\ + mock.patch.object(store.time, "sleep") as sleep: + store.Portfolio.start_worker(poll=3) + + store.Portfolio.worker_notify.set() + + store.Portfolio.callback.wait() + + report.print_log.assert_called_once_with("Fetching cryptoportfolio") + get.assert_called_once_with(refetch=True) + sleep.assert_called_once_with(3) + self.assertFalse(store.Portfolio.worker_notify.is_set()) + self.assertTrue(store.Portfolio.worker.is_alive()) + + store.Portfolio.callback.clear() + store.Portfolio.worker_started = False + store.Portfolio.worker_notify.set() + store.Portfolio.callback.wait() + + self.assertFalse(store.Portfolio.worker.is_alive()) + + def test_notify_and_wait(self): + with mock.patch.object(store.Portfolio, "callback") as callback,\ + mock.patch.object(store.Portfolio, "worker_notify") as worker_notify: + store.Portfolio.notify_and_wait() + callback.clear.assert_called_once_with() + worker_notify.set.assert_called_once_with() + callback.wait.assert_called_once_with() + @unittest.skipUnless("unit" in limits, "Unit skipped") class PortfolioTest(WebMockTestCase): def setUp(self): @@ -453,86 +551,131 @@ class PortfolioTest(WebMockTestCase): @mock.patch.object(market.Portfolio, "parse_cryptoportfolio") def test_get_cryptoportfolio(self, parse_cryptoportfolio): - self.wm.get(market.Portfolio.URL, [ - {"text":'{ "foo": "bar" }', "status_code": 200}, - {"text": "System Error", "status_code": 500}, - {"exc": requests.exceptions.ConnectTimeout}, - ]) - market.Portfolio.get_cryptoportfolio() - self.assertIn("foo", market.Portfolio.data) - self.assertEqual("bar", market.Portfolio.data["foo"]) - self.assertTrue(self.wm.called) - self.assertEqual(1, self.wm.call_count) - market.Portfolio.report.log_error.assert_not_called() - market.Portfolio.report.log_http_request.assert_called_once() - parse_cryptoportfolio.assert_called_once_with() - market.Portfolio.report.log_http_request.reset_mock() - parse_cryptoportfolio.reset_mock() - market.Portfolio.data = None - - market.Portfolio.get_cryptoportfolio() - self.assertIsNone(market.Portfolio.data) - self.assertEqual(2, self.wm.call_count) - parse_cryptoportfolio.assert_not_called() - market.Portfolio.report.log_error.assert_not_called() - market.Portfolio.report.log_http_request.assert_called_once() - market.Portfolio.report.log_http_request.reset_mock() - parse_cryptoportfolio.reset_mock() - - market.Portfolio.data = "Foo" - market.Portfolio.get_cryptoportfolio() - self.assertEqual(2, self.wm.call_count) - parse_cryptoportfolio.assert_not_called() - - market.Portfolio.get_cryptoportfolio(refetch=True) - self.assertEqual("Foo", market.Portfolio.data) - self.assertEqual(3, self.wm.call_count) - market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio", - exception=mock.ANY) - market.Portfolio.report.log_http_request.assert_not_called() + with self.subTest(parallel=False): + self.wm.get(market.Portfolio.URL, [ + {"text":'{ "foo": "bar" }', "status_code": 200}, + {"text": "System Error", "status_code": 500}, + {"exc": requests.exceptions.ConnectTimeout}, + ]) + market.Portfolio.get_cryptoportfolio() + self.assertIn("foo", market.Portfolio.data.get()) + self.assertEqual("bar", market.Portfolio.data.get()["foo"]) + self.assertTrue(self.wm.called) + self.assertEqual(1, self.wm.call_count) + market.Portfolio.report.log_error.assert_not_called() + market.Portfolio.report.log_http_request.assert_called_once() + parse_cryptoportfolio.assert_called_once_with() + market.Portfolio.report.log_http_request.reset_mock() + parse_cryptoportfolio.reset_mock() + market.Portfolio.data = store.LockedVar(None) + + market.Portfolio.get_cryptoportfolio() + self.assertIsNone(market.Portfolio.data.get()) + self.assertEqual(2, self.wm.call_count) + parse_cryptoportfolio.assert_not_called() + market.Portfolio.report.log_error.assert_not_called() + market.Portfolio.report.log_http_request.assert_called_once() + market.Portfolio.report.log_http_request.reset_mock() + parse_cryptoportfolio.reset_mock() + + market.Portfolio.data = store.LockedVar("Foo") + market.Portfolio.get_cryptoportfolio() + self.assertEqual(2, self.wm.call_count) + parse_cryptoportfolio.assert_not_called() + + market.Portfolio.get_cryptoportfolio(refetch=True) + self.assertEqual("Foo", market.Portfolio.data.get()) + self.assertEqual(3, self.wm.call_count) + market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio", + exception=mock.ANY) + market.Portfolio.report.log_http_request.assert_not_called() + with self.subTest(parallel=True): + with mock.patch.object(market.Portfolio, "is_worker_thread") as is_worker,\ + mock.patch.object(market.Portfolio, "notify_and_wait") as notify: + with self.subTest(worker=True): + market.Portfolio.data = store.LockedVar(None) + market.Portfolio.worker = mock.Mock() + is_worker.return_value = True + self.wm.get(market.Portfolio.URL, [ + {"text":'{ "foo": "bar" }', "status_code": 200}, + ]) + market.Portfolio.get_cryptoportfolio() + self.assertIn("foo", market.Portfolio.data.get()) + parse_cryptoportfolio.reset_mock() + with self.subTest(worker=False): + market.Portfolio.data = store.LockedVar(None) + market.Portfolio.worker = mock.Mock() + is_worker.return_value = False + market.Portfolio.get_cryptoportfolio() + notify.assert_called_once_with() + parse_cryptoportfolio.assert_not_called() def test_parse_cryptoportfolio(self): - market.Portfolio.data = store.json.loads(self.json_response, parse_int=D, - parse_float=D) - market.Portfolio.parse_cryptoportfolio() - - self.assertListEqual( - ["medium", "high"], - list(market.Portfolio.liquidities.keys())) - - liquidities = market.Portfolio.liquidities - self.assertEqual(10, len(liquidities["medium"].keys())) - self.assertEqual(10, len(liquidities["high"].keys())) - - expected = { - 'BTC': (D("0.2857"), "long"), - 'DGB': (D("0.1015"), "long"), - 'DOGE': (D("0.1805"), "long"), - 'SC': (D("0.0623"), "long"), - 'ZEC': (D("0.3701"), "long"), - } - date = portfolio.datetime(2018, 1, 8) - self.assertDictEqual(expected, liquidities["high"][date]) - - expected = { - 'BTC': (D("1.1102e-16"), "long"), - 'ETC': (D("0.1"), "long"), - 'FCT': (D("0.1"), "long"), - 'GAS': (D("0.1"), "long"), - 'NAV': (D("0.1"), "long"), - 'OMG': (D("0.1"), "long"), - 'OMNI': (D("0.1"), "long"), - 'PPC': (D("0.1"), "long"), - 'RIC': (D("0.1"), "long"), - 'VIA': (D("0.1"), "long"), - 'XCP': (D("0.1"), "long"), - } - self.assertDictEqual(expected, liquidities["medium"][date]) - self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date) + with self.subTest(description="Normal case"): + market.Portfolio.data = store.LockedVar(store.json.loads( + self.json_response, parse_int=D, parse_float=D)) + market.Portfolio.parse_cryptoportfolio() + + self.assertListEqual( + ["medium", "high"], + list(market.Portfolio.liquidities.get().keys())) + + liquidities = market.Portfolio.liquidities.get() + self.assertEqual(10, len(liquidities["medium"].keys())) + self.assertEqual(10, len(liquidities["high"].keys())) + + expected = { + 'BTC': (D("0.2857"), "long"), + 'DGB': (D("0.1015"), "long"), + 'DOGE': (D("0.1805"), "long"), + 'SC': (D("0.0623"), "long"), + 'ZEC': (D("0.3701"), "long"), + } + date = portfolio.datetime(2018, 1, 8) + self.assertDictEqual(expected, liquidities["high"][date]) + + expected = { + 'BTC': (D("1.1102e-16"), "long"), + 'ETC': (D("0.1"), "long"), + 'FCT': (D("0.1"), "long"), + 'GAS': (D("0.1"), "long"), + 'NAV': (D("0.1"), "long"), + 'OMG': (D("0.1"), "long"), + 'OMNI': (D("0.1"), "long"), + 'PPC': (D("0.1"), "long"), + 'RIC': (D("0.1"), "long"), + 'VIA': (D("0.1"), "long"), + 'XCP': (D("0.1"), "long"), + } + self.assertDictEqual(expected, liquidities["medium"][date]) + self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date.get()) + + with self.subTest(description="Missing weight"): + data = store.json.loads(self.json_response, parse_int=D, parse_float=D) + del(data["portfolio_2"]["weights"]) + market.Portfolio.data = store.LockedVar(data) + + market.Portfolio.parse_cryptoportfolio() + self.assertListEqual( + ["medium", "high"], + list(market.Portfolio.liquidities.get().keys())) + self.assertEqual({}, market.Portfolio.liquidities.get("medium")) + + with self.subTest(description="All missing weights"): + data = store.json.loads(self.json_response, parse_int=D, parse_float=D) + del(data["portfolio_1"]["weights"]) + del(data["portfolio_2"]["weights"]) + market.Portfolio.data = store.LockedVar(data) + + market.Portfolio.parse_cryptoportfolio() + self.assertEqual({}, market.Portfolio.liquidities.get("medium")) + self.assertEqual({}, market.Portfolio.liquidities.get("high")) + self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get()) + @mock.patch.object(market.Portfolio, "get_cryptoportfolio") def test_repartition(self, get_cryptoportfolio): - market.Portfolio.liquidities = { + market.Portfolio.liquidities = store.LockedVar({ "medium": { "2018-03-01": "medium_2018-03-01", "2018-03-08": "medium_2018-03-08", @@ -541,8 +684,8 @@ class PortfolioTest(WebMockTestCase): "2018-03-01": "high_2018-03-01", "2018-03-08": "high_2018-03-08", } - } - market.Portfolio.last_date = "2018-03-08" + }) + market.Portfolio.last_date = store.LockedVar("2018-03-08") self.assertEqual("medium_2018-03-08", market.Portfolio.repartition()) get_cryptoportfolio.assert_called_once_with() @@ -559,9 +702,9 @@ class PortfolioTest(WebMockTestCase): else: self.assertFalse(refetch) self.call_count += 1 - market.Portfolio.last_date = store.datetime.now()\ + market.Portfolio.last_date = store.LockedVar(store.datetime.now()\ - store.timedelta(10)\ - + store.timedelta(self.call_count) + + store.timedelta(self.call_count)) get_cryptoportfolio.side_effect = _get market.Portfolio.wait_for_recent() @@ -572,7 +715,7 @@ class PortfolioTest(WebMockTestCase): sleep.reset_mock() get_cryptoportfolio.reset_mock() - market.Portfolio.last_date = None + market.Portfolio.last_date = store.LockedVar(None) self.call_count = 0 market.Portfolio.wait_for_recent(delta=15) sleep.assert_not_called() @@ -580,13 +723,45 @@ class PortfolioTest(WebMockTestCase): sleep.reset_mock() get_cryptoportfolio.reset_mock() - market.Portfolio.last_date = None + market.Portfolio.last_date = store.LockedVar(None) self.call_count = 0 market.Portfolio.wait_for_recent(delta=1) sleep.assert_called_with(30) self.assertEqual(9, sleep.call_count) self.assertEqual(10, get_cryptoportfolio.call_count) + def test_is_worker_thread(self): + with self.subTest(worker=None): + self.assertFalse(store.Portfolio.is_worker_thread()) + + with self.subTest(worker="not self"),\ + mock.patch("threading.current_thread") as current_thread: + current = mock.Mock() + current_thread.return_value = current + store.Portfolio.worker = mock.Mock() + self.assertFalse(store.Portfolio.is_worker_thread()) + + with self.subTest(worker="self"),\ + mock.patch("threading.current_thread") as current_thread: + current = mock.Mock() + current_thread.return_value = current + store.Portfolio.worker = current + self.assertTrue(store.Portfolio.is_worker_thread()) + + def test_start_worker(self): + with mock.patch.object(store.Portfolio, "wait_for_notification") as notification: + store.Portfolio.start_worker() + notification.assert_called_once_with(poll=30) + + self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__) + self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__) + store.Portfolio.report.start_lock.assert_called_once_with() + + self.assertIsNotNone(store.Portfolio.worker) + self.assertIsNotNone(store.Portfolio.worker_notify) + self.assertIsNotNone(store.Portfolio.callback) + self.assertTrue(store.Portfolio.worker_started) + @unittest.skipUnless("unit" in limits, "Unit skipped") class AmountTest(WebMockTestCase): def test_values(self): @@ -3362,31 +3537,64 @@ class MainTest(WebMockTestCase): self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) def test_main(self): - with mock.patch("main.parse_args") as parse_args,\ - mock.patch("main.parse_config") as parse_config,\ - mock.patch("main.fetch_markets") as fetch_markets,\ - mock.patch("main.process") as process: + with self.subTest(parallel=False): + with mock.patch("main.parse_args") as parse_args,\ + mock.patch("main.parse_config") as parse_config,\ + mock.patch("main.fetch_markets") as fetch_markets,\ + mock.patch("main.process") as process: - args_mock = mock.Mock() - args_mock.config = "config" - args_mock.user = "user" - parse_args.return_value = args_mock + args_mock = mock.Mock() + args_mock.parallel = False + args_mock.config = "config" + args_mock.user = "user" + parse_args.return_value = args_mock - parse_config.return_value = ["pg_config", "report_path"] + parse_config.return_value = ["pg_config", "report_path"] - fetch_markets.return_value = [["config1", 1], ["config2", 2]] + fetch_markets.return_value = [["config1", 1], ["config2", 2]] - main.main(["Foo", "Bar"]) + main.main(["Foo", "Bar"]) - parse_args.assert_called_with(["Foo", "Bar"]) - parse_config.assert_called_with("config") - fetch_markets.assert_called_with("pg_config", "user") + parse_args.assert_called_with(["Foo", "Bar"]) + parse_config.assert_called_with("config") + fetch_markets.assert_called_with("pg_config", "user") - self.assertEqual(2, process.call_count) - process.assert_has_calls([ - mock.call("config1", 1, "report_path", args_mock), - mock.call("config2", 2, "report_path", args_mock), - ]) + self.assertEqual(2, process.call_count) + process.assert_has_calls([ + mock.call("config1", 1, "report_path", args_mock), + mock.call("config2", 2, "report_path", args_mock), + ]) + with self.subTest(parallel=True): + with mock.patch("main.parse_args") as parse_args,\ + mock.patch("main.parse_config") as parse_config,\ + mock.patch("main.fetch_markets") as fetch_markets,\ + mock.patch("main.process") as process,\ + mock.patch("store.Portfolio.start_worker") as start: + + args_mock = mock.Mock() + args_mock.parallel = True + args_mock.config = "config" + args_mock.user = "user" + parse_args.return_value = args_mock + + parse_config.return_value = ["pg_config", "report_path"] + + fetch_markets.return_value = [["config1", 1], ["config2", 2]] + + main.main(["Foo", "Bar"]) + + parse_args.assert_called_with(["Foo", "Bar"]) + parse_config.assert_called_with("config") + fetch_markets.assert_called_with("pg_config", "user") + + start.assert_called_once_with() + self.assertEqual(2, process.call_count) + process.assert_has_calls([ + mock.call.__bool__(), + mock.call("config1", 1, "report_path", args_mock), + mock.call.__bool__(), + mock.call("config2", 2, "report_path", args_mock), + ]) @mock.patch.object(main.sys, "exit") @mock.patch("main.configparser") @@ -3551,7 +3759,7 @@ class ProcessorTest(WebMockTestCase): method, arguments = processor.method_arguments("wait_for_recent") self.assertEqual(market.Portfolio.wait_for_recent, method) - self.assertEqual(["delta"], arguments) + self.assertEqual(["delta", "poll"], arguments) method, arguments = processor.method_arguments("prepare_trades") self.assertEqual(m.prepare_trades, method) -- 2.41.0 From 186f7d816a6bfc4fbf64027c3d7ecea10b97db38 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Mon, 12 Mar 2018 01:55:17 +0100 Subject: [PATCH 02/16] Fix fullfiled not having correct currencies --- portfolio.py | 17 ++++++------ test.py | 75 +++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 65 insertions(+), 27 deletions(-) diff --git a/portfolio.py b/portfolio.py index 0f2c011..65fdc6c 100644 --- a/portfolio.py +++ b/portfolio.py @@ -291,6 +291,7 @@ class Trade: self.orders = [] self.market = market self.closed = False + self.inverted = None assert self.value_from.value * self.value_to.value >= 0 assert self.value_from.currency == self.value_to.currency if self.value_from != 0: @@ -315,8 +316,8 @@ class Trade: else: return "dispose" - def order_action(self, inverted): - if (self.value_from < self.value_to) != inverted: + def order_action(self): + if (self.value_from < self.value_to) != self.inverted: return "buy" else: return "sell" @@ -339,7 +340,7 @@ class Trade: @property def is_fullfiled(self): - return abs(self.filled_amount(in_base_currency=True)) >= abs(self.delta) + return abs(self.filled_amount(in_base_currency=(not self.inverted))) >= abs(self.delta) def filled_amount(self, in_base_currency=False): filled_amount = 0 @@ -385,17 +386,17 @@ class Trade: if self.action is None: return None ticker = self.market.get_ticker(self.currency, self.base_currency) - inverted = ticker["inverted"] - if inverted: + self.inverted = ticker["inverted"] + if self.inverted: ticker = ticker["original"] - rate = Computation.compute_value(ticker, self.order_action(inverted), compute_value=compute_value) + rate = Computation.compute_value(ticker, self.order_action(), compute_value=compute_value) # FIXME: Dust amount should be removed from there if they werent # honored in other sales delta_in_base = abs(self.delta) # 9 BTC's worth of move (10 - 1 or 1 - 10 depending on case) - if not inverted: + if not self.inverted: base_currency = self.base_currency # BTC if self.action == "dispose": @@ -453,7 +454,7 @@ class Trade: self.market.report.log_error("prepare_order", message="Less to do than already filled: {}".format(delta)) return None - order = Order(self.order_action(inverted), + order = Order(self.order_action(), delta, rate, base_currency, self.trade_type, self.market, self, close_if_possible=close_if_possible) self.orders.append(order) diff --git a/test.py b/test.py index a45010b..33a817d 100644 --- a/test.py +++ b/test.py @@ -1375,16 +1375,20 @@ class TradeTest(WebMockTestCase): value_to = portfolio.Amount("BTC", "1.0") trade = portfolio.Trade(value_from, value_to, "ETH", self.m) - self.assertEqual("buy", trade.order_action(False)) - self.assertEqual("sell", trade.order_action(True)) + trade.inverted = False + self.assertEqual("buy", trade.order_action()) + trade.inverted = True + self.assertEqual("sell", trade.order_action()) value_from = portfolio.Amount("BTC", "0") value_from.linked_to = portfolio.Amount("ETH", "0") value_to = portfolio.Amount("BTC", "-1.0") trade = portfolio.Trade(value_from, value_to, "ETH", self.m) - self.assertEqual("sell", trade.order_action(False)) - self.assertEqual("buy", trade.order_action(True)) + trade.inverted = False + self.assertEqual("sell", trade.order_action()) + trade.inverted = True + self.assertEqual("buy", trade.order_action()) def test_trade_type(self): value_from = portfolio.Amount("BTC", "0.5") @@ -1402,26 +1406,59 @@ class TradeTest(WebMockTestCase): self.assertEqual("short", trade.trade_type) def test_is_fullfiled(self): - value_from = portfolio.Amount("BTC", "0.5") - value_from.linked_to = portfolio.Amount("ETH", "10.0") - value_to = portfolio.Amount("BTC", "1.0") - trade = portfolio.Trade(value_from, value_to, "ETH", self.m) + with self.subTest(inverted=False): + value_from = portfolio.Amount("BTC", "0.5") + value_from.linked_to = portfolio.Amount("ETH", "10.0") + value_to = portfolio.Amount("BTC", "1.0") + trade = portfolio.Trade(value_from, value_to, "ETH", self.m) - order1 = mock.Mock() - order1.filled_amount.return_value = portfolio.Amount("BTC", "0.3") + order1 = mock.Mock() + order1.filled_amount.return_value = portfolio.Amount("BTC", "0.3") - order2 = mock.Mock() - order2.filled_amount.return_value = portfolio.Amount("BTC", "0.01") - trade.orders.append(order1) - trade.orders.append(order2) + order2 = mock.Mock() + order2.filled_amount.return_value = portfolio.Amount("BTC", "0.01") + trade.orders.append(order1) + trade.orders.append(order2) + + self.assertFalse(trade.is_fullfiled) + + order3 = mock.Mock() + order3.filled_amount.return_value = portfolio.Amount("BTC", "0.19") + trade.orders.append(order3) + + self.assertTrue(trade.is_fullfiled) + + order1.filled_amount.assert_called_with(in_base_currency=True) + order2.filled_amount.assert_called_with(in_base_currency=True) + order3.filled_amount.assert_called_with(in_base_currency=True) + + with self.subTest(inverted=True): + value_from = portfolio.Amount("BTC", "0.5") + value_from.linked_to = portfolio.Amount("USDT", "1000.0") + value_to = portfolio.Amount("BTC", "1.0") + trade = portfolio.Trade(value_from, value_to, "USDT", self.m) + trade.inverted = True + + order1 = mock.Mock() + order1.filled_amount.return_value = portfolio.Amount("BTC", "0.3") + + order2 = mock.Mock() + order2.filled_amount.return_value = portfolio.Amount("BTC", "0.01") + trade.orders.append(order1) + trade.orders.append(order2) + + self.assertFalse(trade.is_fullfiled) + + order3 = mock.Mock() + order3.filled_amount.return_value = portfolio.Amount("BTC", "0.19") + trade.orders.append(order3) - self.assertFalse(trade.is_fullfiled) + self.assertTrue(trade.is_fullfiled) - order3 = mock.Mock() - order3.filled_amount.return_value = portfolio.Amount("BTC", "0.19") - trade.orders.append(order3) + order1.filled_amount.assert_called_with(in_base_currency=False) + order2.filled_amount.assert_called_with(in_base_currency=False) + order3.filled_amount.assert_called_with(in_base_currency=False) - self.assertTrue(trade.is_fullfiled) def test_filled_amount(self): value_from = portfolio.Amount("BTC", "0.5") -- 2.41.0 From d8deb0e9a6b7b2805e61dc19a287d5474c271cc5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Mon, 12 Mar 2018 01:55:46 +0100 Subject: [PATCH 03/16] Fix mark finished order not alway called when necessary --- portfolio.py | 9 ++++----- test.py | 13 +++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/portfolio.py b/portfolio.py index 65fdc6c..ed50b57 100644 --- a/portfolio.py +++ b/portfolio.py @@ -550,7 +550,7 @@ class Order: @property def finished(self): - return self.status == "closed" or self.status == "canceled" or self.status == "error" + return self.status.startswith("closed") or self.status == "canceled" or self.status == "error" @retry(InsufficientFunds) def run(self): @@ -594,15 +594,13 @@ class Order: # other states are "closed" and "canceled" if not self.finished: self.fetch() - if self.finished: - self.mark_finished_order() return self.status def mark_finished_order(self): - if self.market.debug: + if self.status.startswith("closed") and self.market.debug: self.market.report.log_debug_action("Mark {} as finished".format(self)) return - if self.status == "closed": + if self.status.startswith("closed"): if self.trade_type == "short" and self.action == "buy" and self.close_if_possible: self.market.ccxt.close_margin_position(self.amount.currency, self.base_currency) @@ -621,6 +619,7 @@ class Order: self.fetch_mouvements() + self.mark_finished_order() # FIXME: consider open order with dust remaining as closed def dust_amount_remaining(self): diff --git a/test.py b/test.py index 33a817d..921af9f 100644 --- a/test.py +++ b/test.py @@ -2118,7 +2118,8 @@ class OrderTest(WebMockTestCase): self.m.report.log_debug_action.assert_called_once() @mock.patch.object(portfolio.Order, "fetch_mouvements") - def test_fetch(self, fetch_mouvements): + @mock.patch.object(portfolio.Order, "mark_finished_order") + def test_fetch(self, mark_finished_order, fetch_mouvements): order = portfolio.Order("buy", portfolio.Amount("ETH", 10), D("0.1"), "BTC", "long", self.m, "trade") order.id = 45 @@ -2128,6 +2129,7 @@ class OrderTest(WebMockTestCase): self.m.report.log_debug_action.assert_called_once() self.m.report.log_debug_action.reset_mock() self.m.ccxt.fetch_order.assert_not_called() + mark_finished_order.assert_not_called() fetch_mouvements.assert_not_called() with self.subTest(debug=False): @@ -2144,17 +2146,19 @@ class OrderTest(WebMockTestCase): self.assertEqual("timestamp", order.timestamp) self.assertEqual(1, len(order.results)) self.m.report.log_debug_action.assert_not_called() + mark_finished_order.assert_called_once() + mark_finished_order.reset_mock() with self.subTest(missing_order=True): self.m.ccxt.fetch_order.side_effect = [ portfolio.OrderNotCached, ] order.fetch() self.assertEqual("closed_unknown", order.status) + mark_finished_order.assert_called_once() @mock.patch.object(portfolio.Order, "fetch") - @mock.patch.object(portfolio.Order, "mark_finished_order") - def test_get_status(self, mark_finished_order, fetch): + def test_get_status(self, fetch): with self.subTest(debug=True): self.m.debug = True order = portfolio.Order("buy", portfolio.Amount("ETH", 10), @@ -2173,10 +2177,8 @@ class OrderTest(WebMockTestCase): return update_status fetch.side_effect = _fetch(order) self.assertEqual("open", order.get_status()) - mark_finished_order.assert_not_called() fetch.assert_called_once() - mark_finished_order.reset_mock() fetch.reset_mock() with self.subTest(debug=False, finished=True): self.m.debug = False @@ -2188,7 +2190,6 @@ class OrderTest(WebMockTestCase): return update_status fetch.side_effect = _fetch(order) self.assertEqual("closed", order.get_status()) - mark_finished_order.assert_called_once() fetch.assert_called_once() def test_run(self): -- 2.41.0 From 718e3e919acb9b57269d3155543f9e8ad1b91324 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Mon, 12 Mar 2018 02:54:13 +0100 Subject: [PATCH 04/16] Store printed logs --- market.py | 6 ++++-- store.py | 8 +++++++- test.py | 10 ++++++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/market.py b/market.py index 8672c59..2ddebfa 100644 --- a/market.py +++ b/market.py @@ -49,9 +49,11 @@ class Market: self.report.merge(Portfolio.report) try: if self.report_path is not None: - report_file = "{}/{}_{}.json".format(self.report_path, datetime.now().isoformat(), self.user_id) - with open(report_file, "w") as f: + report_file = "{}/{}_{}".format(self.report_path, datetime.now().isoformat(), self.user_id) + with open(report_file + ".json", "w") as f: f.write(self.report.to_json()) + with open(report_file + ".log", "w") as f: + f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) except Exception as e: print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) diff --git a/store.py b/store.py index f655be5..d875a98 100644 --- a/store.py +++ b/store.py @@ -15,14 +15,20 @@ class ReportStore: self.market = market self.verbose_print = verbose_print + self.print_logs = [] self.logs = [] def merge(self, other_report): self.logs += other_report.logs self.logs.sort(key=lambda x: x["date"]) + self.print_logs += other_report.print_logs + self.print_logs.sort(key=lambda x: x[0]) + def print_log(self, message): - message = str(message) + now = datetime.now() + message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message)) + self.print_logs.append([now, message]) if self.verbose_print: print(message) diff --git a/test.py b/test.py index ac9a6cd..13bd332 100644 --- a/test.py +++ b/test.py @@ -1395,12 +1395,15 @@ class MarketTest(WebMockTestCase): mock.patch.object(market, "datetime") as time_mock: time_mock.now.return_value = datetime.datetime(2018, 2, 25) + report.print_logs = [[time_mock.now(), "Foo"], [time_mock.now(), "Bar"]] report.to_json.return_value = "json_content" m.store_report() file_open.assert_any_call("present/2018-02-25T00:00:00_1.json", "w") - file_open().write.assert_called_once_with("json_content") + file_open.assert_any_call("present/2018-02-25T00:00:00_1.log", "w") + file_open().write.assert_any_call("json_content") + file_open().write.assert_any_call("Foo\nBar") m.report.to_json.assert_called_once_with() report.merge.assert_called_with(store.Portfolio.report) @@ -3016,15 +3019,18 @@ class ReportStoreTest(WebMockTestCase): self.assertEqual(3, len(report_store1.logs)) self.assertEqual(["1", "2", "3"], list(map(lambda x: x["stage"], report_store1.logs))) + self.assertEqual(6, len(report_store1.print_logs)) def test_print_log(self): report_store = market.ReportStore(self.m) with self.subTest(verbose=True),\ + mock.patch.object(store, "datetime") as time_mock,\ mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock: + time_mock.now.return_value = datetime.datetime(2018, 2, 25, 2, 20, 10) report_store.set_verbose(True) report_store.print_log("Coucou") report_store.print_log(portfolio.Amount("BTC", 1)) - self.assertEqual(stdout_mock.getvalue(), "Coucou\n1.00000000 BTC\n") + self.assertEqual(stdout_mock.getvalue(), "2018-02-25 02:20:10: Coucou\n2018-02-25 02:20:10: 1.00000000 BTC\n") with self.subTest(verbose=False),\ mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock: -- 2.41.0 From 07fa7a4bf8f7a6f799120fb9a5965a09bea6c38e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Thu, 22 Mar 2018 21:35:00 +0100 Subject: [PATCH 05/16] Add quiet flag for running --- main.py | 8 ++++++-- market.py | 11 ++++++----- test.py | 51 +++++++++++++++++++++++++++++++-------------------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/main.py b/main.py index 856d449..55981bf 100644 --- a/main.py +++ b/main.py @@ -63,7 +63,8 @@ def make_order(market, value, currency, action="acquire", def get_user_market(config_path, user_id, debug=False): pg_config, report_path = parse_config(config_path) market_config = list(fetch_markets(pg_config, str(user_id)))[0][0] - return market.Market.from_config(market_config, debug=debug) + args = type('Args', (object,), { "debug": debug, "quiet": False })() + return market.Market.from_config(market_config, args, user_id=user_id, report_path=report_path) def fetch_markets(pg_config, user): connection = psycopg2.connect(**pg_config) @@ -109,6 +110,9 @@ def parse_args(argv): parser.add_argument("--after", default=False, action='store_const', const=True, help="Run the steps after the cryptoportfolio update") + parser.add_argument("--quiet", + default=False, action='store_const', const=True, + help="Don't print messages") parser.add_argument("--debug", default=False, action='store_const', const=True, help="Run in debug mode") @@ -131,7 +135,7 @@ def parse_args(argv): def process(market_config, user_id, report_path, args): try: market.Market\ - .from_config(market_config, debug=args.debug, user_id=user_id, report_path=report_path)\ + .from_config(market_config, args, user_id=user_id, report_path=report_path)\ .process(args.action, before=args.before, after=args.after) except Exception as e: print("{}: {}".format(e.__class__.__name__, e)) diff --git a/market.py b/market.py index 2ddebfa..fc5832c 100644 --- a/market.py +++ b/market.py @@ -13,11 +13,12 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None): - self.debug = debug + def __init__(self, ccxt_instance, args, user_id=None, report_path=None): + self.args = args + self.debug = args.debug self.ccxt = ccxt_instance self.ccxt._market = self - self.report = ReportStore(self) + self.report = ReportStore(self, verbose_print=(not args.quiet)) self.trades = TradeStore(self) self.balances = BalanceStore(self) self.processor = Processor(self) @@ -26,7 +27,7 @@ class Market: self.report_path = report_path @classmethod - def from_config(cls, config, debug=False, user_id=None, report_path=None): + def from_config(cls, config, args, user_id=None, report_path=None): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) @@ -43,7 +44,7 @@ class Market: ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, ccxt_instance.session.__class__) - return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path) + return cls(ccxt_instance, args, user_id=user_id, report_path=report_path) def store_report(self): self.report.merge(Portfolio.report) diff --git a/test.py b/test.py index 13bd332..3ee34c6 100644 --- a/test.py +++ b/test.py @@ -23,6 +23,9 @@ for test_type in limits: class WebMockTestCase(unittest.TestCase): import time + def market_args(self, debug=False, quiet=False): + return type('Args', (object,), { "debug": debug, "quiet": quiet })() + def setUp(self): super(WebMockTestCase, self).setUp() self.wm = requests_mock.Mocker() @@ -1092,7 +1095,7 @@ class MarketTest(WebMockTestCase): self.ccxt = mock.Mock(spec=market.ccxt.poloniexE) def test_values(self): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) self.assertEqual(self.ccxt, m.ccxt) self.assertFalse(m.debug) @@ -1104,19 +1107,27 @@ class MarketTest(WebMockTestCase): self.assertEqual(m, m.balances.market) self.assertEqual(m, m.ccxt._market) - m = market.Market(self.ccxt, debug=True) + m = market.Market(self.ccxt, self.market_args(debug=True)) self.assertTrue(m.debug) - m = market.Market(self.ccxt, debug=False) + m = market.Market(self.ccxt, self.market_args(debug=False)) self.assertFalse(m.debug) + with mock.patch("market.ReportStore") as report_store: + with self.subTest(quiet=False): + m = market.Market(self.ccxt, self.market_args(quiet=False)) + report_store.assert_called_with(m, verbose_print=True) + with self.subTest(quiet=True): + m = market.Market(self.ccxt, self.market_args(quiet=True)) + report_store.assert_called_with(m, verbose_print=False) + @mock.patch("market.ccxt") def test_from_config(self, ccxt): with mock.patch("market.ReportStore"): ccxt.poloniexE.return_value = self.ccxt self.ccxt.session.request.return_value = "response" - m = market.Market.from_config({"key": "key", "secred": "secret"}) + m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args()) self.assertEqual(self.ccxt, m.ccxt) @@ -1125,7 +1136,7 @@ class MarketTest(WebMockTestCase): m.report.log_http_request.assert_called_with('GET', 'URL', 'data', 'headers', 'response') - m = market.Market.from_config({"key": "key", "secred": "secret"}, debug=True) + m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args(debug=True)) self.assertEqual(True, m.debug) def test_get_tickers(self): @@ -1134,7 +1145,7 @@ class MarketTest(WebMockTestCase): market.NotSupported ] - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) self.assertEqual("tickers", m.get_tickers()) self.assertEqual("tickers", m.get_tickers()) self.ccxt.fetch_tickers.assert_called_once() @@ -1147,7 +1158,7 @@ class MarketTest(WebMockTestCase): "ETH/ETC": { "bid": 1, "ask": 3 }, "XVG/ETH": { "bid": 10, "ask": 40 }, } - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) ticker = m.get_ticker("ETH", "ETC") self.assertEqual(1, ticker["bid"]) @@ -1175,7 +1186,7 @@ class MarketTest(WebMockTestCase): market.ExchangeError("foo"), ] - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) ticker = m.get_ticker("ETH", "ETC") self.ccxt.fetch_ticker.assert_called_with("ETH/ETC") @@ -1195,7 +1206,7 @@ class MarketTest(WebMockTestCase): self.assertIsNone(ticker) def test_fetch_fees(self): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) self.ccxt.fetch_fees.return_value = "Foo" self.assertEqual("Foo", m.fetch_fees()) self.ccxt.fetch_fees.assert_called_once() @@ -1222,7 +1233,7 @@ class MarketTest(WebMockTestCase): get_ticker.side_effect = _get_ticker with mock.patch("market.ReportStore"): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) self.ccxt.fetch_all_balances.return_value = { "USDT": { "exchange_free": D("10000.0"), @@ -1262,7 +1273,7 @@ class MarketTest(WebMockTestCase): (False, 12), (True, 12)]: with self.subTest(sleep=sleep, debug=debug), \ mock.patch("market.ReportStore"): - m = market.Market(self.ccxt, debug=debug) + m = market.Market(self.ccxt, self.market_args(debug=debug)) order_mock1 = mock.Mock() order_mock2 = mock.Mock() @@ -1339,7 +1350,7 @@ class MarketTest(WebMockTestCase): for debug in [True, False]: with self.subTest(debug=debug),\ mock.patch("market.ReportStore"): - m = market.Market(self.ccxt, debug=debug) + m = market.Market(self.ccxt, self.market_args(debug=debug)) value_from = portfolio.Amount("BTC", "1.0") value_from.linked_to = portfolio.Amount("ETH", "10.0") @@ -1378,7 +1389,7 @@ class MarketTest(WebMockTestCase): def test_store_report(self): file_open = mock.mock_open() - m = market.Market(self.ccxt, user_id=1) + m = market.Market(self.ccxt, self.market_args(), user_id=1) with self.subTest(file=None),\ mock.patch.object(m, "report") as report,\ mock.patch("market.open", file_open): @@ -1388,7 +1399,7 @@ class MarketTest(WebMockTestCase): report.reset_mock() file_open = mock.mock_open() - m = market.Market(self.ccxt, report_path="present", user_id=1) + m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1) with self.subTest(file="present"),\ mock.patch("market.open", file_open),\ mock.patch.object(m, "report") as report,\ @@ -1409,7 +1420,7 @@ class MarketTest(WebMockTestCase): report.reset_mock() - m = market.Market(self.ccxt, report_path="error", user_id=1) + m = market.Market(self.ccxt, self.market_args(), report_path="error", user_id=1) with self.subTest(file="error"),\ mock.patch("market.open") as file_open,\ mock.patch.object(m, "report") as report,\ @@ -1422,7 +1433,7 @@ class MarketTest(WebMockTestCase): self.assertRegex(stdout_mock.getvalue(), "impossible to store report file: FileNotFoundError;") def test_print_orders(self): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) with mock.patch.object(m.report, "log_stage") as log_stage,\ mock.patch.object(m.balances, "fetch_balances") as fetch_balances,\ mock.patch.object(m, "prepare_trades") as prepare_trades,\ @@ -1436,7 +1447,7 @@ class MarketTest(WebMockTestCase): prepare_orders.assert_called_with(compute_value="average") def test_print_balances(self): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) with mock.patch.object(m.balances, "in_currency") as in_currency,\ mock.patch.object(m.report, "log_stage") as log_stage,\ @@ -1461,7 +1472,7 @@ class MarketTest(WebMockTestCase): @mock.patch("market.ReportStore.log_error") @mock.patch("market.Market.store_report") def test_process(self, store_report, log_error, process): - m = market.Market(self.ccxt) + m = market.Market(self.ccxt, self.market_args()) with self.subTest(before=False, after=False): m.process(None) @@ -3571,7 +3582,7 @@ class MainTest(WebMockTestCase): main.process("config", 1, "report_path", args_mock) market_mock.from_config.assert_has_calls([ - mock.call("config", debug="debug", user_id=1, report_path="report_path"), + mock.call("config", args_mock, user_id=1, report_path="report_path"), mock.call().process("action", before="before", after="after"), ]) @@ -3797,7 +3808,7 @@ class ProcessorTest(WebMockTestCase): def test_method_arguments(self): ccxt = mock.Mock(spec=market.ccxt.poloniexE) - m = market.Market(ccxt) + m = market.Market(ccxt, self.market_args()) processor = market.Processor(m) -- 2.41.0 From b4e0ba0b0aa84550d0b06338b59557c3050798c9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Fri, 23 Mar 2018 01:11:34 +0100 Subject: [PATCH 06/16] Store reports to database Fixes https://git.immae.eu/mantisbt/view.php?id=57 --- main.py | 22 +++++--- market.py | 49 +++++++++++++--- store.py | 20 +++++-- test.py | 165 ++++++++++++++++++++++++++++++++++++++++++++---------- 4 files changed, 202 insertions(+), 54 deletions(-) diff --git a/main.py b/main.py index 55981bf..3e98289 100644 --- a/main.py +++ b/main.py @@ -62,7 +62,7 @@ def make_order(market, value, currency, action="acquire", def get_user_market(config_path, user_id, debug=False): pg_config, report_path = parse_config(config_path) - market_config = list(fetch_markets(pg_config, str(user_id)))[0][0] + market_config = list(fetch_markets(pg_config, str(user_id)))[0][1] args = type('Args', (object,), { "debug": debug, "quiet": False })() return market.Market.from_config(market_config, args, user_id=user_id, report_path=report_path) @@ -71,9 +71,9 @@ def fetch_markets(pg_config, user): cursor = connection.cursor() if user is None: - cursor.execute("SELECT config,user_id FROM market_configs") + cursor.execute("SELECT id,config,user_id FROM market_configs") else: - cursor.execute("SELECT config,user_id FROM market_configs WHERE user_id = %s", user) + cursor.execute("SELECT id,config,user_id FROM market_configs WHERE user_id = %s", user) for row in cursor: yield row @@ -132,10 +132,12 @@ def parse_args(argv): return args -def process(market_config, user_id, report_path, args): +def process(market_id, market_config, user_id, report_path, args, pg_config): try: market.Market\ - .from_config(market_config, args, user_id=user_id, report_path=report_path)\ + .from_config(market_config, args, + pg_config=pg_config, market_id=market_id, + user_id=user_id, report_path=report_path)\ .process(args.action, before=args.before, after=args.after) except Exception as e: print("{}: {}".format(e.__class__.__name__, e)) @@ -149,11 +151,13 @@ def main(argv): import threading market.Portfolio.start_worker() - for market_config, user_id in fetch_markets(pg_config, args.user): - threading.Thread(target=process, args=[market_config, user_id, report_path, args]).start() + for row in fetch_markets(pg_config, args.user): + threading.Thread(target=process, args=[ + *row, report_path, args, pg_config + ]).start() else: - for market_config, user_id in fetch_markets(pg_config, args.user): - process(market_config, user_id, report_path, args) + for row in fetch_markets(pg_config, args.user): + process(*row, report_path, args, pg_config) if __name__ == '__main__': # pragma: no cover main(sys.argv[1:]) diff --git a/market.py b/market.py index fc5832c..78ced1a 100644 --- a/market.py +++ b/market.py @@ -1,6 +1,7 @@ from ccxt import ExchangeError, NotSupported import ccxt_wrapper as ccxt import time +import psycopg2 from store import * from cachetools.func import ttl_cache from datetime import datetime @@ -13,7 +14,9 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, args, user_id=None, report_path=None): + def __init__(self, ccxt_instance, args, + user_id=None, market_id=None, + report_path=None, pg_config=None): self.args = args self.debug = args.debug self.ccxt = ccxt_instance @@ -24,10 +27,13 @@ class Market: self.processor = Processor(self) self.user_id = user_id + self.market_id = market_id self.report_path = report_path + self.pg_config = pg_config @classmethod - def from_config(cls, config, args, user_id=None, report_path=None): + def from_config(cls, config, args, + user_id=None, market_id=None, report_path=None, pg_config=None): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) @@ -44,20 +50,45 @@ class Market: ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, ccxt_instance.session.__class__) - return cls(ccxt_instance, args, user_id=user_id, report_path=report_path) + return cls(ccxt_instance, args, + user_id=user_id, market_id=market_id, + pg_config=pg_config, report_path=report_path) def store_report(self): self.report.merge(Portfolio.report) + date = datetime.now() + if self.report_path is not None: + self.store_file_report(date) + if self.pg_config is not None: + self.store_database_report(date) + + def store_file_report(self, date): try: - if self.report_path is not None: - report_file = "{}/{}_{}".format(self.report_path, datetime.now().isoformat(), self.user_id) - with open(report_file + ".json", "w") as f: - f.write(self.report.to_json()) - with open(report_file + ".log", "w") as f: - f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) + report_file = "{}/{}_{}".format(self.report_path, date.isoformat(), self.user_id) + with open(report_file + ".json", "w") as f: + f.write(self.report.to_json()) + with open(report_file + ".log", "w") as f: + f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) except Exception as e: print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) + def store_database_report(self, date): + try: + report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' + line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' + connection = psycopg2.connect(**self.pg_config) + cursor = connection.cursor() + cursor.execute(report_query, (date, self.market_id, self.debug)) + report_id = cursor.fetchone()[0] + for date, type_, payload in self.report.to_json_array(): + cursor.execute(line_query, (date, report_id, type_, payload)) + + connection.commit() + cursor.close() + connection.close() + except Exception as e: + print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: if len(actions or []) == 0: diff --git a/store.py b/store.py index d875a98..b3ada45 100644 --- a/store.py +++ b/store.py @@ -36,12 +36,22 @@ class ReportStore: hash_["date"] = datetime.now() self.logs.append(hash_) + @staticmethod + def default_json_serial(obj): + if isinstance(obj, (datetime, date)): + return obj.isoformat() + return str(obj) + def to_json(self): - def default_json_serial(obj): - if isinstance(obj, (datetime, date)): - return obj.isoformat() - return str(obj) - return json.dumps(self.logs, default=default_json_serial, indent=" ") + return json.dumps(self.logs, default=self.default_json_serial, indent=" ") + + def to_json_array(self): + for log in (x.copy() for x in self.logs): + yield ( + log.pop("date"), + log.pop("type"), + json.dumps(log, default=self.default_json_serial, indent=" ") + ) def set_verbose(self, verbose_print): self.verbose_print = verbose_print diff --git a/test.py b/test.py index 3ee34c6..5b9c56c 100644 --- a/test.py +++ b/test.py @@ -1386,18 +1386,7 @@ class MarketTest(WebMockTestCase): self.ccxt.transfer_balance.assert_any_call("USDT", 100, "exchange", "margin") self.ccxt.transfer_balance.assert_any_call("ETC", 5, "margin", "exchange") - def test_store_report(self): - - file_open = mock.mock_open() - m = market.Market(self.ccxt, self.market_args(), user_id=1) - with self.subTest(file=None),\ - mock.patch.object(m, "report") as report,\ - mock.patch("market.open", file_open): - m.store_report() - report.merge.assert_called_with(store.Portfolio.report) - file_open.assert_not_called() - - report.reset_mock() + def test_store_file_report(self): file_open = mock.mock_open() m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1) with self.subTest(file="present"),\ @@ -1405,20 +1394,16 @@ class MarketTest(WebMockTestCase): mock.patch.object(m, "report") as report,\ mock.patch.object(market, "datetime") as time_mock: - time_mock.now.return_value = datetime.datetime(2018, 2, 25) report.print_logs = [[time_mock.now(), "Foo"], [time_mock.now(), "Bar"]] report.to_json.return_value = "json_content" - m.store_report() + m.store_file_report(datetime.datetime(2018, 2, 25)) file_open.assert_any_call("present/2018-02-25T00:00:00_1.json", "w") file_open.assert_any_call("present/2018-02-25T00:00:00_1.log", "w") file_open().write.assert_any_call("json_content") file_open().write.assert_any_call("Foo\nBar") m.report.to_json.assert_called_once_with() - report.merge.assert_called_with(store.Portfolio.report) - - report.reset_mock() m = market.Market(self.ccxt, self.market_args(), report_path="error", user_id=1) with self.subTest(file="error"),\ @@ -1427,10 +1412,106 @@ class MarketTest(WebMockTestCase): mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock: file_open.side_effect = FileNotFoundError + m.store_file_report(datetime.datetime(2018, 2, 25)) + + self.assertRegex(stdout_mock.getvalue(), "impossible to store report file: FileNotFoundError;") + + @mock.patch.object(market, "psycopg2") + def test_store_database_report(self, psycopg2): + connect_mock = mock.Mock() + cursor_mock = mock.MagicMock() + + connect_mock.cursor.return_value = cursor_mock + psycopg2.connect.return_value = connect_mock + m = market.Market(self.ccxt, self.market_args(), + pg_config={"config": "pg_config"}, user_id=1) + cursor_mock.fetchone.return_value = [42] + + with self.subTest(error=False),\ + mock.patch.object(m, "report") as report: + report.to_json_array.return_value = [ + ("date1", "type1", "payload1"), + ("date2", "type2", "payload2"), + ] + m.store_database_report(datetime.datetime(2018, 3, 24)) + connect_mock.assert_has_calls([ + mock.call.cursor(), + mock.call.cursor().execute('INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;', (datetime.datetime(2018, 3, 24), None, False)), + mock.call.cursor().fetchone(), + mock.call.cursor().execute('INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);', ('date1', 42, 'type1', 'payload1')), + mock.call.cursor().execute('INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);', ('date2', 42, 'type2', 'payload2')), + mock.call.commit(), + mock.call.cursor().close(), + mock.call.close() + ]) + + connect_mock.reset_mock() + with self.subTest(error=True),\ + mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock: + psycopg2.connect.side_effect = Exception("Bouh") + m.store_database_report(datetime.datetime(2018, 3, 24)) + self.assertEqual(stdout_mock.getvalue(), "impossible to store report to database: Exception; Bouh\n") + + def test_store_report(self): + m = market.Market(self.ccxt, self.market_args(), user_id=1) + with self.subTest(file=None, pg_config=None),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(m, "store_file_report") as file_report: + m.store_report() + report.merge.assert_called_with(store.Portfolio.report) + + file_report.assert_not_called() + db_report.assert_not_called() + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1) + with self.subTest(file="present", pg_config=None),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + m.store_report() report.merge.assert_called_with(store.Portfolio.report) - self.assertRegex(stdout_mock.getvalue(), "impossible to store report file: FileNotFoundError;") + file_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) + db_report.assert_not_called() + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(), pg_config="present", user_id=1) + with self.subTest(file=None, pg_config="present"),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + + m.store_report() + + report.merge.assert_called_with(store.Portfolio.report) + file_report.assert_not_called() + db_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) + + report.reset_mock() + m = market.Market(self.ccxt, self.market_args(), + pg_config="pg_config", report_path="present", user_id=1) + with self.subTest(file="present", pg_config="present"),\ + mock.patch.object(m, "report") as report,\ + mock.patch.object(m, "store_file_report") as file_report,\ + mock.patch.object(m, "store_database_report") as db_report,\ + mock.patch.object(market, "datetime") as time_mock: + + time_mock.now.return_value = datetime.datetime(2018, 2, 25) + + m.store_report() + + report.merge.assert_called_with(store.Portfolio.report) + file_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) + db_report.assert_called_once_with(datetime.datetime(2018, 2, 25)) def test_print_orders(self): m = market.Market(self.ccxt, self.market_args()) @@ -3050,6 +3131,14 @@ class ReportStoreTest(WebMockTestCase): report_store.print_log(portfolio.Amount("BTC", 1)) self.assertEqual(stdout_mock.getvalue(), "") + def test_default_json_serial(self): + report_store = market.ReportStore(self.m) + + self.assertEqual("2018-02-24T00:00:00", + report_store.default_json_serial(portfolio.datetime(2018, 2, 24))) + self.assertEqual("1.00000000 BTC", + report_store.default_json_serial(portfolio.Amount("BTC", 1))) + def test_to_json(self): report_store = market.ReportStore(self.m) report_store.logs.append({"foo": "bar"}) @@ -3059,6 +3148,20 @@ class ReportStoreTest(WebMockTestCase): report_store.logs.append({"amount": portfolio.Amount("BTC", 1)}) self.assertEqual('[\n {\n "foo": "bar"\n },\n {\n "date": "2018-02-24T00:00:00"\n },\n {\n "amount": "1.00000000 BTC"\n }\n]', report_store.to_json()) + def test_to_json_array(self): + report_store = market.ReportStore(self.m) + report_store.logs.append({ + "date": "date1", "type": "type1", "foo": "bar", "bla": "bla" + }) + report_store.logs.append({ + "date": "date2", "type": "type2", "foo": "bar", "bla": "bla" + }) + logs = list(report_store.to_json_array()) + + self.assertEqual(2, len(logs)) + self.assertEqual(("date1", "type1", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[0]) + self.assertEqual(("date2", "type2", '{\n "foo": "bar",\n "bla": "bla"\n}'), logs[1]) + @mock.patch.object(market.ReportStore, "print_log") @mock.patch.object(market.ReportStore, "add_log") def test_log_stage(self, add_log, print_log): @@ -3552,7 +3655,7 @@ class MainTest(WebMockTestCase): mock.patch("main.parse_config") as main_parse_config: with self.subTest(debug=False): main_parse_config.return_value = ["pg_config", "report_path"] - main_fetch_markets.return_value = [({"key": "market_config"},)] + main_fetch_markets.return_value = [(1, {"key": "market_config"}, 3)] m = main.get_user_market("config_path.ini", 1) self.assertIsInstance(m, market.Market) @@ -3560,7 +3663,7 @@ class MainTest(WebMockTestCase): with self.subTest(debug=True): main_parse_config.return_value = ["pg_config", "report_path"] - main_fetch_markets.return_value = [({"key": "market_config"},)] + main_fetch_markets.return_value = [(1, {"key": "market_config"}, 3)] m = main.get_user_market("config_path.ini", 1, debug=True) self.assertIsInstance(m, market.Market) @@ -3579,16 +3682,16 @@ class MainTest(WebMockTestCase): args_mock.after = "after" self.assertEqual("", stdout_mock.getvalue()) - main.process("config", 1, "report_path", args_mock) + main.process(3, "config", 1, "report_path", args_mock, "pg_config") market_mock.from_config.assert_has_calls([ - mock.call("config", args_mock, user_id=1, report_path="report_path"), + mock.call("config", args_mock, pg_config="pg_config", market_id=3, user_id=1, report_path="report_path"), mock.call().process("action", before="before", after="after"), ]) with self.subTest(exception=True): market_mock.from_config.side_effect = Exception("boo") - main.process("config", 1, "report_path", args_mock) + main.process(3, "config", 1, "report_path", args_mock, "pg_config") self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) def test_main(self): @@ -3606,7 +3709,7 @@ class MainTest(WebMockTestCase): parse_config.return_value = ["pg_config", "report_path"] - fetch_markets.return_value = [["config1", 1], ["config2", 2]] + fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] main.main(["Foo", "Bar"]) @@ -3616,8 +3719,8 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ - mock.call("config1", 1, "report_path", args_mock), - mock.call("config2", 2, "report_path", args_mock), + mock.call(3, "config1", 1, "report_path", args_mock, "pg_config"), + mock.call(1, "config2", 2, "report_path", args_mock, "pg_config"), ]) with self.subTest(parallel=True): with mock.patch("main.parse_args") as parse_args,\ @@ -3634,7 +3737,7 @@ class MainTest(WebMockTestCase): parse_config.return_value = ["pg_config", "report_path"] - fetch_markets.return_value = [["config1", 1], ["config2", 2]] + fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]] main.main(["Foo", "Bar"]) @@ -3646,9 +3749,9 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ mock.call.__bool__(), - mock.call("config1", 1, "report_path", args_mock), + mock.call(3, "config1", 1, "report_path", args_mock, "pg_config"), mock.call.__bool__(), - mock.call("config2", 2, "report_path", args_mock), + mock.call(1, "config2", 2, "report_path", args_mock, "pg_config"), ]) @mock.patch.object(main.sys, "exit") @@ -3734,7 +3837,7 @@ class MainTest(WebMockTestCase): rows = list(main.fetch_markets({"foo": "bar"}, None)) psycopg2.connect.assert_called_once_with(foo="bar") - cursor_mock.execute.assert_called_once_with("SELECT config,user_id FROM market_configs") + cursor_mock.execute.assert_called_once_with("SELECT id,config,user_id FROM market_configs") self.assertEqual(["row_1", "row_2"], rows) @@ -3744,7 +3847,7 @@ class MainTest(WebMockTestCase): rows = list(main.fetch_markets({"foo": "bar"}, 1)) psycopg2.connect.assert_called_once_with(foo="bar") - cursor_mock.execute.assert_called_once_with("SELECT config,user_id FROM market_configs WHERE user_id = %s", 1) + cursor_mock.execute.assert_called_once_with("SELECT id,config,user_id FROM market_configs WHERE user_id = %s", 1) self.assertEqual(["row_1", "row_2"], rows) -- 2.41.0 From 88e486a61fd563b3d30d34e1962269eae5a931bf Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sat, 24 Mar 2018 10:27:39 +0100 Subject: [PATCH 07/16] Add task to migrate reports to database --- tasks/import_reports_to_database.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tasks/import_reports_to_database.py diff --git a/tasks/import_reports_to_database.py b/tasks/import_reports_to_database.py new file mode 100644 index 0000000..152c762 --- /dev/null +++ b/tasks/import_reports_to_database.py @@ -0,0 +1,50 @@ +import sys +import os +import simplejson as json +from datetime import datetime +from decimal import Decimal as D +import psycopg2 + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from main import parse_config + +config = sys.argv[1] +reports = sys.argv[2:] + +pg_config, report_path = parse_config(config) + +connection = psycopg2.connect(**pg_config) +cursor = connection.cursor() + +report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' +line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' + +user_id_to_market_id = { + 2: 1, + 1: 3, + } + +for report in reports: + with open(report, "rb") as f: + json_content = json.load(f, parse_float=D) + basename = os.path.basename(report) + date, rest = basename.split("_", 1) + user_id, rest = rest.split(".", 1) + + date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") + market_id = user_id_to_market_id[int(user_id)] + debug = any("debug" in x and x["debug"] for x in json_content) + print(market_id, date, debug) + cursor.execute(report_query, (date, market_id, debug)) + report_id = cursor.fetchone()[0] + + for line in json_content: + date = datetime.strptime(line["date"], "%Y-%m-%dT%H:%M:%S.%f") + type_ = line["type"] + del(line["date"]) + del(line["type"]) + + cursor.execute(line_query, (date, report_id, type_, json.dumps(line))) +connection.commit() +cursor.close() +connection.close() -- 2.41.0 From 35667b31ddf1ce47a56ccbf4db9896dbc165ad0a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Fri, 23 Mar 2018 23:33:36 +0100 Subject: [PATCH 08/16] Cleanup market from_config --- main.py | 20 +++++++++++--------- market.py | 17 +++++------------ test.py | 10 +++++----- 3 files changed, 21 insertions(+), 26 deletions(-) diff --git a/main.py b/main.py index 3e98289..4462192 100644 --- a/main.py +++ b/main.py @@ -62,9 +62,11 @@ def make_order(market, value, currency, action="acquire", def get_user_market(config_path, user_id, debug=False): pg_config, report_path = parse_config(config_path) - market_config = list(fetch_markets(pg_config, str(user_id)))[0][1] + market_id, market_config, user_id = list(fetch_markets(pg_config, str(user_id)))[0] args = type('Args', (object,), { "debug": debug, "quiet": False })() - return market.Market.from_config(market_config, args, user_id=user_id, report_path=report_path) + return market.Market.from_config(market_config, args, + pg_config=pg_config, market_id=market_id, + user_id=user_id, report_path=report_path) def fetch_markets(pg_config, user): connection = psycopg2.connect(**pg_config) @@ -132,7 +134,7 @@ def parse_args(argv): return args -def process(market_id, market_config, user_id, report_path, args, pg_config): +def process(market_config, market_id, user_id, args, report_path, pg_config): try: market.Market\ .from_config(market_config, args, @@ -151,13 +153,13 @@ def main(argv): import threading market.Portfolio.start_worker() - for row in fetch_markets(pg_config, args.user): - threading.Thread(target=process, args=[ - *row, report_path, args, pg_config - ]).start() + def process_(*args): + threading.Thread(target=process, args=args).start() else: - for row in fetch_markets(pg_config, args.user): - process(*row, report_path, args, pg_config) + process_ = process + + for market_id, market_config, user_id in fetch_markets(pg_config, args.user): + process_(market_config, market_id, user_id, args, report_path, pg_config) if __name__ == '__main__': # pragma: no cover main(sys.argv[1:]) diff --git a/market.py b/market.py index 78ced1a..496ec45 100644 --- a/market.py +++ b/market.py @@ -14,9 +14,7 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, args, - user_id=None, market_id=None, - report_path=None, pg_config=None): + def __init__(self, ccxt_instance, args, **kwargs): self.args = args self.debug = args.debug self.ccxt = ccxt_instance @@ -26,14 +24,11 @@ class Market: self.balances = BalanceStore(self) self.processor = Processor(self) - self.user_id = user_id - self.market_id = market_id - self.report_path = report_path - self.pg_config = pg_config + for key in ["user_id", "market_id", "report_path", "pg_config"]: + setattr(self, key, kwargs.get(key, None)) @classmethod - def from_config(cls, config, args, - user_id=None, market_id=None, report_path=None, pg_config=None): + def from_config(cls, config, args, **kwargs): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) @@ -50,9 +45,7 @@ class Market: ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, ccxt_instance.session.__class__) - return cls(ccxt_instance, args, - user_id=user_id, market_id=market_id, - pg_config=pg_config, report_path=report_path) + return cls(ccxt_instance, args, **kwargs) def store_report(self): self.report.merge(Portfolio.report) diff --git a/test.py b/test.py index 5b9c56c..637a305 100644 --- a/test.py +++ b/test.py @@ -3682,7 +3682,7 @@ class MainTest(WebMockTestCase): args_mock.after = "after" self.assertEqual("", stdout_mock.getvalue()) - main.process(3, "config", 1, "report_path", args_mock, "pg_config") + main.process("config", 3, 1, args_mock, "report_path", "pg_config") market_mock.from_config.assert_has_calls([ mock.call("config", args_mock, pg_config="pg_config", market_id=3, user_id=1, report_path="report_path"), @@ -3719,8 +3719,8 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ - mock.call(3, "config1", 1, "report_path", args_mock, "pg_config"), - mock.call(1, "config2", 2, "report_path", args_mock, "pg_config"), + mock.call("config1", 3, 1, args_mock, "report_path", "pg_config"), + mock.call("config2", 1, 2, args_mock, "report_path", "pg_config"), ]) with self.subTest(parallel=True): with mock.patch("main.parse_args") as parse_args,\ @@ -3749,9 +3749,9 @@ class MainTest(WebMockTestCase): self.assertEqual(2, process.call_count) process.assert_has_calls([ mock.call.__bool__(), - mock.call(3, "config1", 1, "report_path", args_mock, "pg_config"), + mock.call("config1", 3, 1, args_mock, "report_path", "pg_config"), mock.call.__bool__(), - mock.call(1, "config2", 2, "report_path", args_mock, "pg_config"), + mock.call("config2", 1, 2, args_mock, "report_path", "pg_config"), ]) @mock.patch.object(main.sys, "exit") -- 2.41.0 From 472787b6360221588423d03fe3e73d92c09a7c9d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sat, 24 Mar 2018 11:15:33 +0100 Subject: [PATCH 09/16] Fetch market_config for import report task --- tasks/import_reports_to_database.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tasks/import_reports_to_database.py b/tasks/import_reports_to_database.py index 152c762..6031cbe 100644 --- a/tasks/import_reports_to_database.py +++ b/tasks/import_reports_to_database.py @@ -16,13 +16,9 @@ pg_config, report_path = parse_config(config) connection = psycopg2.connect(**pg_config) cursor = connection.cursor() -report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' -line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' - -user_id_to_market_id = { - 2: 1, - 1: 3, - } +report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id' +line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s)' +market_config_query = "SELECT id FROM market_configs WHERE user_id = %s AND market_name = 'poloniex'" for report in reports: with open(report, "rb") as f: @@ -32,7 +28,8 @@ for report in reports: user_id, rest = rest.split(".", 1) date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") - market_id = user_id_to_market_id[int(user_id)] + cursor.execute(market_config_query, user_id) + market_id = cursor.fetchone()[0] debug = any("debug" in x and x["debug"] for x in json_content) print(market_id, date, debug) cursor.execute(report_query, (date, market_id, debug)) @@ -44,7 +41,7 @@ for report in reports: del(line["date"]) del(line["type"]) - cursor.execute(line_query, (date, report_id, type_, json.dumps(line))) + cursor.execute(line_query, (date, report_id, type_, json.dumps(line, indent=" "))) connection.commit() cursor.close() connection.close() -- 2.41.0 From c7c1e0b26821fdd5622f81fb456f1028d4c9ab09 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sat, 24 Mar 2018 15:18:31 +0100 Subject: [PATCH 10/16] Add retry facility for api call timeouts Fixes https://git.immae.eu/mantisbt/view.php?id=40 --- ccxt_wrapper.py | 30 ++++++++++++++++++++++++++++ test.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/ccxt_wrapper.py b/ccxt_wrapper.py index d37c306..c500659 100644 --- a/ccxt_wrapper.py +++ b/ccxt_wrapper.py @@ -1,12 +1,42 @@ from ccxt import * import decimal import time +from retry.api import retry_call +import re def _cw_exchange_sum(self, *args): return sum([arg for arg in args if isinstance(arg, (float, int, decimal.Decimal))]) Exchange.sum = _cw_exchange_sum class poloniexE(poloniex): + RETRIABLE_CALLS = [ + re.compile(r"^return"), + re.compile(r"^cancel"), + re.compile(r"^closeMarginPosition$"), + re.compile(r"^getMarginPosition$"), + ] + + def request(self, path, api='public', method='GET', params={}, headers=None, body=None): + """ + Wrapped to allow retry of non-posting requests" + """ + + origin_request = super(poloniexE, self).request + kwargs = { + "api": api, + "method": method, + "params": params, + "headers": headers, + "body": body + } + + retriable = any(re.match(call, path) for call in self.RETRIABLE_CALLS) + if api == "public" or method == "GET" or retriable: + return retry_call(origin_request, fargs=[path], fkwargs=kwargs, + tries=10, delay=1, exceptions=(RequestTimeout,)) + else: + return origin_request(path, **kwargs) + @staticmethod def nanoseconds(): return int(time.time() * 1000000000) diff --git a/test.py b/test.py index 637a305..40c64a9 100644 --- a/test.py +++ b/test.py @@ -80,6 +80,58 @@ class poloniexETest(unittest.TestCase): time.return_value = 123456.7890123456 self.assertEqual(123456789012345, self.s.nonce()) + def test_request(self): + with mock.patch.object(market.ccxt.poloniex, "request") as request,\ + mock.patch("market.ccxt.retry_call") as retry_call: + with self.subTest(wrapped=True): + with self.subTest(desc="public"): + self.s.request("foo") + retry_call.assert_called_with(request, + delay=1, tries=10, fargs=["foo"], + fkwargs={'api': 'public', 'method': 'GET', 'params': {}, 'headers': None, 'body': None}, + exceptions=(market.ccxt.RequestTimeout,)) + request.assert_not_called() + + with self.subTest(desc="private GET"): + self.s.request("foo", api="private") + retry_call.assert_called_with(request, + delay=1, tries=10, fargs=["foo"], + fkwargs={'api': 'private', 'method': 'GET', 'params': {}, 'headers': None, 'body': None}, + exceptions=(market.ccxt.RequestTimeout,)) + request.assert_not_called() + + with self.subTest(desc="private POST regexp"): + self.s.request("returnFoo", api="private", method="POST") + retry_call.assert_called_with(request, + delay=1, tries=10, fargs=["returnFoo"], + fkwargs={'api': 'private', 'method': 'POST', 'params': {}, 'headers': None, 'body': None}, + exceptions=(market.ccxt.RequestTimeout,)) + request.assert_not_called() + + with self.subTest(desc="private POST non-regexp"): + self.s.request("getMarginPosition", api="private", method="POST") + retry_call.assert_called_with(request, + delay=1, tries=10, fargs=["getMarginPosition"], + fkwargs={'api': 'private', 'method': 'POST', 'params': {}, 'headers': None, 'body': None}, + exceptions=(market.ccxt.RequestTimeout,)) + request.assert_not_called() + retry_call.reset_mock() + request.reset_mock() + with self.subTest(wrapped=False): + with self.subTest(desc="private POST non-matching regexp"): + self.s.request("marginBuy", api="private", method="POST") + request.assert_called_with("marginBuy", + api="private", method="POST", params={}, + headers=None, body=None) + retry_call.assert_not_called() + + with self.subTest(desc="private POST non-matching non-regexp"): + self.s.request("closeMarginPositionOther", api="private", method="POST") + request.assert_called_with("closeMarginPositionOther", + api="private", method="POST", params={}, + headers=None, body=None) + retry_call.assert_not_called() + def test_order_precision(self): self.assertEqual(8, self.s.order_precision("FOO")) -- 2.41.0 From 445b4a7712fb7fe45e17b6b76356dd3be42dd900 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sat, 24 Mar 2018 15:18:56 +0100 Subject: [PATCH 11/16] Move request wrapper to ccxt --- ccxt_wrapper.py | 15 +++++++++++++++ market.py | 12 ------------ test.py | 18 ++++++++++++------ 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/ccxt_wrapper.py b/ccxt_wrapper.py index c500659..4ed37d9 100644 --- a/ccxt_wrapper.py +++ b/ccxt_wrapper.py @@ -37,6 +37,21 @@ class poloniexE(poloniex): else: return origin_request(path, **kwargs) + def __init__(self, *args, **kwargs): + super(poloniexE, self).__init__(*args, **kwargs) + + # For requests logging + self.session.origin_request = self.session.request + self.session._parent = self + + def request_wrap(self, *args, **kwargs): + r = self.origin_request(*args, **kwargs) + self._parent._market.report.log_http_request(args[0], + args[1], kwargs["data"], kwargs["headers"], r) + return r + self.session.request = request_wrap.__get__(self.session, + self.session.__class__) + @staticmethod def nanoseconds(): return int(time.time() * 1000000000) diff --git a/market.py b/market.py index 496ec45..055967c 100644 --- a/market.py +++ b/market.py @@ -33,18 +33,6 @@ class Market: ccxt_instance = ccxt.poloniexE(config) - # For requests logging - ccxt_instance.session.origin_request = ccxt_instance.session.request - ccxt_instance.session._parent = ccxt_instance - - def request_wrap(self, *args, **kwargs): - r = self.origin_request(*args, **kwargs) - self._parent._market.report.log_http_request(args[0], - args[1], kwargs["data"], kwargs["headers"], r) - return r - ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, - ccxt_instance.session.__class__) - return cls(ccxt_instance, args, **kwargs) def store_report(self): diff --git a/test.py b/test.py index 40c64a9..18616c1 100644 --- a/test.py +++ b/test.py @@ -70,6 +70,18 @@ class poloniexETest(unittest.TestCase): self.wm.stop() super(poloniexETest, self).tearDown() + def test__init(self): + with mock.patch("market.ccxt.poloniexE.session") as session: + session.request.return_value = "response" + ccxt = market.ccxt.poloniexE() + ccxt._market = mock.Mock + ccxt._market.report = mock.Mock() + + ccxt.session.request("GET", "URL", data="data", + headers="headers") + ccxt._market.report.log_http_request.assert_called_with('GET', 'URL', 'data', + 'headers', 'response') + def test_nanoseconds(self): with mock.patch.object(market.ccxt.time, "time") as time: time.return_value = 123456.7890123456 @@ -1177,17 +1189,11 @@ class MarketTest(WebMockTestCase): def test_from_config(self, ccxt): with mock.patch("market.ReportStore"): ccxt.poloniexE.return_value = self.ccxt - self.ccxt.session.request.return_value = "response" m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args()) self.assertEqual(self.ccxt, m.ccxt) - self.ccxt.session.request("GET", "URL", data="data", - headers="headers") - m.report.log_http_request.assert_called_with('GET', 'URL', 'data', - 'headers', 'response') - m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args(debug=True)) self.assertEqual(True, m.debug) -- 2.41.0 From 337c8286cc31d81ffdad06a225996f86c46c46f0 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sun, 25 Mar 2018 19:08:48 +0200 Subject: [PATCH 12/16] Handle timeouts for move_balances --- market.py | 22 ++++++--- test.py | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 6 deletions(-) diff --git a/market.py b/market.py index 055967c..ca65bca 100644 --- a/market.py +++ b/market.py @@ -1,10 +1,11 @@ -from ccxt import ExchangeError, NotSupported +from ccxt import ExchangeError, NotSupported, RequestTimeout import ccxt_wrapper as ccxt import time import psycopg2 from store import * from cachetools.func import ttl_cache from datetime import datetime +from retry import retry import portfolio class Market: @@ -88,6 +89,7 @@ class Market: finally: self.store_report() + @retry(RequestTimeout, tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} @@ -102,13 +104,21 @@ class Market: current_balance = self.balances.all[currency].margin_available moving_to_margin[currency] = (needed - current_balance) delta = moving_to_margin[currency].value + action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) + if self.debug and delta != 0: - self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency])) + self.report.log_debug_action(action) continue - if delta > 0: - self.ccxt.transfer_balance(currency, delta, "exchange", "margin") - elif delta < 0: - self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + try: + if delta > 0: + self.ccxt.transfer_balance(currency, delta, "exchange", "margin") + elif delta < 0: + self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + except RequestTimeout as e: + self.report.log_error(action, message="Retrying", exception=e) + self.report.log_move_balances(needed_in_margin, moving_to_margin) + self.balances.fetch_balances() + raise e self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() diff --git a/test.py b/test.py index 18616c1..ea1fd9a 100644 --- a/test.py +++ b/test.py @@ -1444,6 +1444,139 @@ class MarketTest(WebMockTestCase): self.ccxt.transfer_balance.assert_any_call("USDT", 100, "exchange", "margin") self.ccxt.transfer_balance.assert_any_call("ETC", 5, "margin", "exchange") + m.report.reset_mock() + fetch_balances.reset_mock() + with self.subTest(retry=True): + with mock.patch("market.ReportStore"): + m = market.Market(self.ccxt, self.market_args()) + + value_from = portfolio.Amount("BTC", "0.0") + value_from.linked_to = portfolio.Amount("ETH", "0.0") + value_to = portfolio.Amount("BTC", "-3.0") + trade = portfolio.Trade(value_from, value_to, "ETH", m) + + m.trades.all = [trade] + balance = portfolio.Balance("BTC", { "margin_in_position": "0", "margin_available": "0" }) + m.balances.all = {"BTC": balance} + + m.ccxt.transfer_balance.side_effect = [ + market.ccxt.RequestTimeout, + True + ] + m.move_balances() + self.ccxt.transfer_balance.assert_has_calls([ + mock.call("BTC", 3, "exchange", "margin"), + mock.call("BTC", 3, "exchange", "margin") + ]) + self.assertEqual(2, fetch_balances.call_count) + m.report.log_error.assert_called_with(mock.ANY, message="Retrying", exception=mock.ANY) + self.assertEqual(2, m.report.log_move_balances.call_count) + + self.ccxt.transfer_balance.reset_mock() + m.report.reset_mock() + fetch_balances.reset_mock() + with self.subTest(retry=True, too_much=True): + with mock.patch("market.ReportStore"): + m = market.Market(self.ccxt, self.market_args()) + + value_from = portfolio.Amount("BTC", "0.0") + value_from.linked_to = portfolio.Amount("ETH", "0.0") + value_to = portfolio.Amount("BTC", "-3.0") + trade = portfolio.Trade(value_from, value_to, "ETH", m) + + m.trades.all = [trade] + balance = portfolio.Balance("BTC", { "margin_in_position": "0", "margin_available": "0" }) + m.balances.all = {"BTC": balance} + + m.ccxt.transfer_balance.side_effect = [ + market.ccxt.RequestTimeout, + market.ccxt.RequestTimeout, + market.ccxt.RequestTimeout, + market.ccxt.RequestTimeout, + market.ccxt.RequestTimeout, + ] + with self.assertRaises(market.ccxt.RequestTimeout): + m.move_balances() + + self.ccxt.transfer_balance.reset_mock() + m.report.reset_mock() + fetch_balances.reset_mock() + with self.subTest(retry=True, partial_result=True): + with mock.patch("market.ReportStore"): + m = market.Market(self.ccxt, self.market_args()) + + value_from = portfolio.Amount("BTC", "1.0") + value_from.linked_to = portfolio.Amount("ETH", "10.0") + value_to = portfolio.Amount("BTC", "10.0") + trade1 = portfolio.Trade(value_from, value_to, "ETH", m) + + value_from = portfolio.Amount("BTC", "0.0") + value_from.linked_to = portfolio.Amount("ETH", "0.0") + value_to = portfolio.Amount("BTC", "-3.0") + trade2 = portfolio.Trade(value_from, value_to, "ETH", m) + + value_from = portfolio.Amount("USDT", "0.0") + value_from.linked_to = portfolio.Amount("XVG", "0.0") + value_to = portfolio.Amount("USDT", "-50.0") + trade3 = portfolio.Trade(value_from, value_to, "XVG", m) + + m.trades.all = [trade1, trade2, trade3] + balance1 = portfolio.Balance("BTC", { "margin_in_position": "0", "margin_available": "0" }) + balance2 = portfolio.Balance("USDT", { "margin_in_position": "100", "margin_available": "50" }) + balance3 = portfolio.Balance("ETC", { "margin_in_position": "10", "margin_available": "15" }) + m.balances.all = {"BTC": balance1, "USDT": balance2, "ETC": balance3} + + call_counts = { "BTC": 0, "USDT": 0, "ETC": 0 } + def _transfer_balance(currency, amount, from_, to_): + call_counts[currency] += 1 + if currency == "BTC": + m.balances.all["BTC"] = portfolio.Balance("BTC", { "margin_in_position": "0", "margin_available": "3" }) + if currency == "USDT": + if call_counts["USDT"] == 1: + raise market.ccxt.RequestTimeout + else: + m.balances.all["USDT"] = portfolio.Balance("USDT", { "margin_in_position": "100", "margin_available": "150" }) + if currency == "ETC": + m.balances.all["ETC"] = portfolio.Balance("ETC", { "margin_in_position": "10", "margin_available": "10" }) + + + m.ccxt.transfer_balance.side_effect = _transfer_balance + + m.move_balances() + self.ccxt.transfer_balance.assert_has_calls([ + mock.call("BTC", 3, "exchange", "margin"), + mock.call('USDT', 100, 'exchange', 'margin'), + mock.call('USDT', 100, 'exchange', 'margin'), + mock.call("ETC", 5, "margin", "exchange") + ]) + self.assertEqual(2, fetch_balances.call_count) + m.report.log_error.assert_called_with(mock.ANY, message="Retrying", exception=mock.ANY) + self.assertEqual(2, m.report.log_move_balances.call_count) + m.report.log_move_balances.asser_has_calls([ + mock.call( + { + 'BTC': portfolio.Amount("BTC", "3"), + 'USDT': portfolio.Amount("USDT", "150"), + 'ETC': portfolio.Amount("ETC", "10"), + }, + { + 'BTC': portfolio.Amount("BTC", "3"), + 'USDT': portfolio.Amount("USDT", "100"), + }), + mock.call( + { + 'BTC': portfolio.Amount("BTC", "3"), + 'USDT': portfolio.Amount("USDT", "150"), + 'ETC': portfolio.Amount("ETC", "10"), + }, + { + 'BTC': portfolio.Amount("BTC", "0"), + 'USDT': portfolio.Amount("USDT", "100"), + 'ETC': portfolio.Amount("ETC", "-5"), + }), + ]) + + def test_store_file_report(self): file_open = mock.mock_open() m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1) -- 2.41.0 From 7f3e7d27409fd7355db9ae4f8fe9a346cd50c712 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sun, 25 Mar 2018 19:09:05 +0200 Subject: [PATCH 13/16] Handle timeouts for order creations --- portfolio.py | 79 +++++++- test.py | 501 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 572 insertions(+), 8 deletions(-) diff --git a/portfolio.py b/portfolio.py index 69e3755..9c58676 100644 --- a/portfolio.py +++ b/portfolio.py @@ -1,9 +1,7 @@ from datetime import datetime -from decimal import Decimal as D, ROUND_DOWN -from ccxt import ExchangeError, InsufficientFunds, ExchangeNotAvailable, InvalidOrder, OrderNotCached, OrderNotFound from retry import retry - -# FIXME: correctly handle web call timeouts +from decimal import Decimal as D, ROUND_DOWN +from ccxt import ExchangeError, InsufficientFunds, ExchangeNotAvailable, InvalidOrder, OrderNotCached, OrderNotFound, RequestTimeout class Computation: computations = { @@ -414,6 +412,9 @@ class Trade: for mouvement in order.mouvements: self.market.report.print_log("{}\t\t{}".format(ind, mouvement)) +class RetryException(Exception): + pass + class Order: def __init__(self, action, amount, rate, base_currency, trade_type, market, trade, close_if_possible=False): @@ -430,6 +431,7 @@ class Order: self.close_if_possible = close_if_possible self.id = None self.tries = 0 + self.start_date = None def as_json(self): return { @@ -475,18 +477,18 @@ class Order: def finished(self): return self.status.startswith("closed") or self.status == "canceled" or self.status == "error" - @retry(InsufficientFunds) + @retry((InsufficientFunds, RetryException)) def run(self): self.tries += 1 symbol = "{}/{}".format(self.amount.currency, self.base_currency) amount = round(self.amount, self.market.ccxt.order_precision(symbol)).value + action = "market.ccxt.create_order('{}', 'limit', '{}', {}, price={}, account={})".format(symbol, self.action, amount, self.rate, self.account) if self.market.debug: - self.market.report.log_debug_action("market.ccxt.create_order('{}', 'limit', '{}', {}, price={}, account={})".format( - symbol, self.action, amount, self.rate, self.account)) + self.market.report.log_debug_action(action) self.results.append({"debug": True, "id": -1}) else: - action = "market.ccxt.create_order('{}', 'limit', '{}', {}, price={}, account={})".format(symbol, self.action, amount, self.rate, self.account) + self.start_date = datetime.now() try: self.results.append(self.market.ccxt.create_order(symbol, 'limit', self.action, amount, price=self.rate, account=self.account)) except InvalidOrder: @@ -494,6 +496,19 @@ class Order: self.status = "closed" self.mark_finished_order() return + except RequestTimeout as e: + if not self.retrieve_order(): + if self.tries < 5: + self.market.report.log_error(action, message="Retrying after timeout", exception=e) + # We make a specific call in case retrieve_order + # would raise itself + raise RetryException + else: + self.market.report.log_error(action, message="Giving up {} after timeouts".format(self), exception=e) + self.status = "error" + return + else: + self.market.report.log_error(action, message="Timeout, found the order") except InsufficientFunds as e: if self.tries < 5: self.market.report.log_error(action, message="Retrying with reduced amount", exception=e) @@ -585,6 +600,54 @@ class Order: self.market.report.log_error("cancel_order", message="Already cancelled order", exception=e) self.fetch() + def retrieve_order(self): + symbol = "{}/{}".format(self.amount.currency, self.base_currency) + amount = round(self.amount, self.market.ccxt.order_precision(symbol)).value + start_timestamp = self.start_date.timestamp() - 5 + + similar_open_orders = self.market.ccxt.fetch_orders(symbol=symbol, since=start_timestamp) + for order in similar_open_orders: + if (order["info"]["margin"] == 1 and self.account == "exchange") or\ + (order["info"]["margin"] != 1 and self.account == "margin"): + i_m_tested = True # coverage bug ?! + continue + if order["info"]["side"] != self.action: + continue + amount_diff = round( + abs(D(order["info"]["startingAmount"]) - amount), + self.market.ccxt.order_precision(symbol)) + rate_diff = round( + abs(D(order["info"]["rate"]) - self.rate), + self.market.ccxt.order_precision(symbol)) + if amount_diff != 0 or rate_diff != 0: + continue + self.results.append({"id": order["id"]}) + return True + + similar_trades = self.market.ccxt.fetch_my_trades(symbol=symbol, since=start_timestamp) + # FIXME: use set instead of sorted(list(...)) + for order_id in sorted(list(map(lambda x: x["order"], similar_trades))): + trades = list(filter(lambda x: x["order"] == order_id, similar_trades)) + if any(x["timestamp"] < start_timestamp for x in trades): + continue + if any(x["side"] != self.action for x in trades): + continue + if any(x["info"]["category"] == "exchange" and self.account == "margin" for x in trades) or\ + any(x["info"]["category"] == "marginTrade" and self.account == "exchange" for x in trades): + continue + trade_sum = sum(D(x["info"]["amount"]) for x in trades) + amount_diff = round(abs(trade_sum - amount), + self.market.ccxt.order_precision(symbol)) + if amount_diff != 0: + continue + if (self.action == "sell" and any(D(x["info"]["rate"]) < self.rate for x in trades)) or\ + (self.action == "buy" and any(D(x["info"]["rate"]) > self.rate for x in trades)): + continue + self.results.append({"id": order_id}) + return True + + return False + class Mouvement: def __init__(self, currency, base_currency, hash_): self.currency = currency diff --git a/test.py b/test.py index ea1fd9a..5b9e2b4 100644 --- a/test.py +++ b/test.py @@ -3213,6 +3213,507 @@ class OrderTest(WebMockTestCase): self.assertEqual(5, self.m.report.log_error.call_count) self.m.report.log_error.assert_called_with(mock.ANY, message="Giving up Order(buy long 0.00096060 ETH at 0.1 BTC [pending])", exception=mock.ANY) + self.m.reset_mock() + with self.subTest(request_timeout=True): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + with self.subTest(retrieved=False), \ + mock.patch.object(order, "retrieve_order") as retrieve: + self.m.ccxt.create_order.side_effect = [ + portfolio.RequestTimeout, + portfolio.RequestTimeout, + { "id": 123 }, + ] + retrieve.return_value = False + order.run() + self.m.ccxt.create_order.assert_has_calls([ + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + ]) + self.assertEqual(3, self.m.ccxt.create_order.call_count) + self.assertEqual(3, order.tries) + self.m.report.log_error.assert_called() + self.assertEqual(2, self.m.report.log_error.call_count) + self.m.report.log_error.assert_called_with(mock.ANY, message="Retrying after timeout", exception=mock.ANY) + self.assertEqual(123, order.id) + + self.m.reset_mock() + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + with self.subTest(retrieved=True), \ + mock.patch.object(order, "retrieve_order") as retrieve: + self.m.ccxt.create_order.side_effect = [ + portfolio.RequestTimeout, + ] + def _retrieve(): + order.results.append({"id": 123}) + return True + retrieve.side_effect = _retrieve + order.run() + self.m.ccxt.create_order.assert_has_calls([ + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + ]) + self.assertEqual(1, self.m.ccxt.create_order.call_count) + self.assertEqual(1, order.tries) + self.m.report.log_error.assert_called() + self.assertEqual(1, self.m.report.log_error.call_count) + self.m.report.log_error.assert_called_with(mock.ANY, message="Timeout, found the order") + self.assertEqual(123, order.id) + + self.m.reset_mock() + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + with self.subTest(retrieved=False), \ + mock.patch.object(order, "retrieve_order") as retrieve: + self.m.ccxt.create_order.side_effect = [ + portfolio.RequestTimeout, + portfolio.RequestTimeout, + portfolio.RequestTimeout, + portfolio.RequestTimeout, + portfolio.RequestTimeout, + ] + retrieve.return_value = False + order.run() + self.m.ccxt.create_order.assert_has_calls([ + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + ]) + self.assertEqual(5, self.m.ccxt.create_order.call_count) + self.assertEqual(5, order.tries) + self.m.report.log_error.assert_called() + self.assertEqual(5, self.m.report.log_error.call_count) + self.m.report.log_error.assert_called_with(mock.ANY, message="Giving up Order(buy long 0.00100000 ETH at 0.1 BTC [pending]) after timeouts", exception=mock.ANY) + self.assertEqual("error", order.status) + + def test_retrieve_order(self): + with self.subTest(similar_open_order=True): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + order.start_date = datetime.datetime(2018, 3, 25, 15, 15, 55) + + self.m.ccxt.order_precision.return_value = 8 + self.m.ccxt.fetch_orders.return_value = [ + { # Wrong amount + 'amount': 0.002, 'cost': 0.1, + 'datetime': '2018-03-25T15:15:51.000Z', + 'fee': None, 'filled': 0.0, + 'id': '1', + 'info': { + 'amount': '0.002', + 'date': '2018-03-25 15:15:51', + 'margin': 0, 'orderNumber': '1', + 'price': '0.1', 'rate': '0.1', + 'side': 'buy', 'startingAmount': '0.002', + 'status': 'open', 'total': '0.0002', + 'type': 'limit' + }, + 'price': 0.1, 'remaining': 0.002, 'side': 'buy', + 'status': 'open', 'symbol': 'ETH/BTC', + 'timestamp': 1521990951000, 'trades': None, + 'type': 'limit' + }, + { # Margin + 'amount': 0.001, 'cost': 0.1, + 'datetime': '2018-03-25T15:15:51.000Z', + 'fee': None, 'filled': 0.0, + 'id': '2', + 'info': { + 'amount': '0.001', + 'date': '2018-03-25 15:15:51', + 'margin': 1, 'orderNumber': '2', + 'price': '0.1', 'rate': '0.1', + 'side': 'buy', 'startingAmount': '0.001', + 'status': 'open', 'total': '0.0001', + 'type': 'limit' + }, + 'price': 0.1, 'remaining': 0.001, 'side': 'buy', + 'status': 'open', 'symbol': 'ETH/BTC', + 'timestamp': 1521990951000, 'trades': None, + 'type': 'limit' + }, + { # selling + 'amount': 0.001, 'cost': 0.1, + 'datetime': '2018-03-25T15:15:51.000Z', + 'fee': None, 'filled': 0.0, + 'id': '3', + 'info': { + 'amount': '0.001', + 'date': '2018-03-25 15:15:51', + 'margin': 0, 'orderNumber': '3', + 'price': '0.1', 'rate': '0.1', + 'side': 'sell', 'startingAmount': '0.001', + 'status': 'open', 'total': '0.0001', + 'type': 'limit' + }, + 'price': 0.1, 'remaining': 0.001, 'side': 'sell', + 'status': 'open', 'symbol': 'ETH/BTC', + 'timestamp': 1521990951000, 'trades': None, + 'type': 'limit' + }, + { # Wrong rate + 'amount': 0.001, 'cost': 0.15, + 'datetime': '2018-03-25T15:15:51.000Z', + 'fee': None, 'filled': 0.0, + 'id': '4', + 'info': { + 'amount': '0.001', + 'date': '2018-03-25 15:15:51', + 'margin': 0, 'orderNumber': '4', + 'price': '0.15', 'rate': '0.15', + 'side': 'buy', 'startingAmount': '0.001', + 'status': 'open', 'total': '0.0001', + 'type': 'limit' + }, + 'price': 0.15, 'remaining': 0.001, 'side': 'buy', + 'status': 'open', 'symbol': 'ETH/BTC', + 'timestamp': 1521990951000, 'trades': None, + 'type': 'limit' + }, + { # All good + 'amount': 0.001, 'cost': 0.1, + 'datetime': '2018-03-25T15:15:51.000Z', + 'fee': None, 'filled': 0.0, + 'id': '5', + 'info': { + 'amount': '0.001', + 'date': '2018-03-25 15:15:51', + 'margin': 0, 'orderNumber': '1', + 'price': '0.1', 'rate': '0.1', + 'side': 'buy', 'startingAmount': '0.001', + 'status': 'open', 'total': '0.0001', + 'type': 'limit' + }, + 'price': 0.1, 'remaining': 0.001, 'side': 'buy', + 'status': 'open', 'symbol': 'ETH/BTC', + 'timestamp': 1521990951000, 'trades': None, + 'type': 'limit' + } + ] + result = order.retrieve_order() + self.assertTrue(result) + self.assertEqual('5', order.results[0]["id"]) + self.m.ccxt.fetch_my_trades.assert_not_called() + self.m.ccxt.fetch_orders.assert_called_once_with(symbol="ETH/BTC", since=1521983750) + + self.m.reset_mock() + with self.subTest(similar_open_order=False, past_trades=True): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + order.start_date = datetime.datetime(2018, 3, 25, 15, 15, 55) + + self.m.ccxt.order_precision.return_value = 8 + self.m.ccxt.fetch_orders.return_value = [] + self.m.ccxt.fetch_my_trades.return_value = [ + { # Wrong timestamp 1 + 'amount': 0.0006, + 'cost': 0.00006, + 'datetime': '2018-03-25T15:15:14.000Z', + 'id': '1-1', + 'info': { + 'amount': '0.0006', + 'category': 'exchange', + 'date': '2018-03-25 15:15:14', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '1', + 'rate': '0.1', + 'total': '0.00006', + 'tradeID': '1-1', + 'type': 'buy' + }, + 'order': '1', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983714, + 'type': 'limit' + }, + { # Wrong timestamp 2 + 'amount': 0.0004, + 'cost': 0.00004, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '1-2', + 'info': { + 'amount': '0.0004', + 'category': 'exchange', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '1', + 'rate': '0.1', + 'total': '0.00004', + 'tradeID': '1-2', + 'type': 'buy' + }, + 'order': '1', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + { # Wrong side 1 + 'amount': 0.0006, + 'cost': 0.00006, + 'datetime': '2018-03-25T15:15:54.000Z', + 'id': '2-1', + 'info': { + 'amount': '0.0006', + 'category': 'exchange', + 'date': '2018-03-25 15:15:54', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '2', + 'rate': '0.1', + 'total': '0.00006', + 'tradeID': '2-1', + 'type': 'sell' + }, + 'order': '2', + 'price': 0.1, + 'side': 'sell', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983754, + 'type': 'limit' + }, + { # Wrong side 2 + 'amount': 0.0004, + 'cost': 0.00004, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '2-2', + 'info': { + 'amount': '0.0004', + 'category': 'exchange', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '2', + 'rate': '0.1', + 'total': '0.00004', + 'tradeID': '2-2', + 'type': 'buy' + }, + 'order': '2', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + { # Margin trade 1 + 'amount': 0.0006, + 'cost': 0.00006, + 'datetime': '2018-03-25T15:15:54.000Z', + 'id': '3-1', + 'info': { + 'amount': '0.0006', + 'category': 'marginTrade', + 'date': '2018-03-25 15:15:54', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '3', + 'rate': '0.1', + 'total': '0.00006', + 'tradeID': '3-1', + 'type': 'buy' + }, + 'order': '3', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983754, + 'type': 'limit' + }, + { # Margin trade 2 + 'amount': 0.0004, + 'cost': 0.00004, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '3-2', + 'info': { + 'amount': '0.0004', + 'category': 'marginTrade', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '3', + 'rate': '0.1', + 'total': '0.00004', + 'tradeID': '3-2', + 'type': 'buy' + }, + 'order': '3', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + { # Wrong amount 1 + 'amount': 0.0005, + 'cost': 0.00005, + 'datetime': '2018-03-25T15:15:54.000Z', + 'id': '4-1', + 'info': { + 'amount': '0.0005', + 'category': 'exchange', + 'date': '2018-03-25 15:15:54', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '4', + 'rate': '0.1', + 'total': '0.00005', + 'tradeID': '4-1', + 'type': 'buy' + }, + 'order': '4', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983754, + 'type': 'limit' + }, + { # Wrong amount 2 + 'amount': 0.0004, + 'cost': 0.00004, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '4-2', + 'info': { + 'amount': '0.0004', + 'category': 'exchange', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '4', + 'rate': '0.1', + 'total': '0.00004', + 'tradeID': '4-2', + 'type': 'buy' + }, + 'order': '4', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + { # Wrong price 1 + 'amount': 0.0006, + 'cost': 0.000066, + 'datetime': '2018-03-25T15:15:54.000Z', + 'id': '5-1', + 'info': { + 'amount': '0.0006', + 'category': 'exchange', + 'date': '2018-03-25 15:15:54', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '5', + 'rate': '0.11', + 'total': '0.000066', + 'tradeID': '5-1', + 'type': 'buy' + }, + 'order': '5', + 'price': 0.11, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983754, + 'type': 'limit' + }, + { # Wrong price 2 + 'amount': 0.0004, + 'cost': 0.00004, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '5-2', + 'info': { + 'amount': '0.0004', + 'category': 'exchange', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '5', + 'rate': '0.1', + 'total': '0.00004', + 'tradeID': '5-2', + 'type': 'buy' + }, + 'order': '5', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + { # All good 1 + 'amount': 0.0006, + 'cost': 0.00006, + 'datetime': '2018-03-25T15:15:54.000Z', + 'id': '7-1', + 'info': { + 'amount': '0.0006', + 'category': 'exchange', + 'date': '2018-03-25 15:15:54', + 'fee': '0.00150000', + 'globalTradeID': 1, + 'orderNumber': '7', + 'rate': '0.1', + 'total': '0.00006', + 'tradeID': '7-1', + 'type': 'buy' + }, + 'order': '7', + 'price': 0.1, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983754, + 'type': 'limit' + }, + { # All good 2 + 'amount': 0.0004, + 'cost': 0.000036, + 'datetime': '2018-03-25T15:16:54.000Z', + 'id': '7-2', + 'info': { + 'amount': '0.0004', + 'category': 'exchange', + 'date': '2018-03-25 15:16:54', + 'fee': '0.00150000', + 'globalTradeID': 2, + 'orderNumber': '7', + 'rate': '0.09', + 'total': '0.000036', + 'tradeID': '7-2', + 'type': 'buy' + }, + 'order': '7', + 'price': 0.09, + 'side': 'buy', + 'symbol': 'ETH/BTC', + 'timestamp': 1521983814, + 'type': 'limit' + }, + ] + + result = order.retrieve_order() + self.assertTrue(result) + self.assertEqual('7', order.results[0]["id"]) + self.m.ccxt.fetch_orders.assert_called_once_with(symbol="ETH/BTC", since=1521983750) + + self.m.reset_mock() + with self.subTest(similar_open_order=False, past_trades=False): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + order.start_date = datetime.datetime(2018, 3, 25, 15, 15, 55) + + self.m.ccxt.order_precision.return_value = 8 + self.m.ccxt.fetch_orders.return_value = [] + self.m.ccxt.fetch_my_trades.return_value = [] + result = order.retrieve_order() + self.assertFalse(result) @unittest.skipUnless("unit" in limits, "Unit skipped") class MouvementTest(WebMockTestCase): -- 2.41.0 From b47d7b54cca8ff142eaadf38c8bb425bf11af2cd Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Sun, 25 Mar 2018 23:57:39 +0200 Subject: [PATCH 14/16] Handle invalid nonces --- ccxt_wrapper.py | 2 +- market.py | 6 +++--- portfolio.py | 12 +++++++++-- test.py | 56 +++++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/ccxt_wrapper.py b/ccxt_wrapper.py index 4ed37d9..97f359f 100644 --- a/ccxt_wrapper.py +++ b/ccxt_wrapper.py @@ -33,7 +33,7 @@ class poloniexE(poloniex): retriable = any(re.match(call, path) for call in self.RETRIABLE_CALLS) if api == "public" or method == "GET" or retriable: return retry_call(origin_request, fargs=[path], fkwargs=kwargs, - tries=10, delay=1, exceptions=(RequestTimeout,)) + tries=10, delay=1, exceptions=(RequestTimeout, InvalidNonce)) else: return origin_request(path, **kwargs) diff --git a/market.py b/market.py index ca65bca..d0e6ab4 100644 --- a/market.py +++ b/market.py @@ -1,4 +1,4 @@ -from ccxt import ExchangeError, NotSupported, RequestTimeout +from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time import psycopg2 @@ -89,7 +89,7 @@ class Market: finally: self.store_report() - @retry(RequestTimeout, tries=5) + @retry((RequestTimeout, InvalidNonce), tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} @@ -114,7 +114,7 @@ class Market: self.ccxt.transfer_balance(currency, delta, "exchange", "margin") elif delta < 0: self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") - except RequestTimeout as e: + except (RequestTimeout, InvalidNonce) as e: self.report.log_error(action, message="Retrying", exception=e) self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() diff --git a/portfolio.py b/portfolio.py index 9c58676..9dae23e 100644 --- a/portfolio.py +++ b/portfolio.py @@ -1,7 +1,7 @@ from datetime import datetime from retry import retry from decimal import Decimal as D, ROUND_DOWN -from ccxt import ExchangeError, InsufficientFunds, ExchangeNotAvailable, InvalidOrder, OrderNotCached, OrderNotFound, RequestTimeout +from ccxt import ExchangeError, InsufficientFunds, ExchangeNotAvailable, InvalidOrder, OrderNotCached, OrderNotFound, RequestTimeout, InvalidNonce class Computation: computations = { @@ -477,7 +477,7 @@ class Order: def finished(self): return self.status.startswith("closed") or self.status == "canceled" or self.status == "error" - @retry((InsufficientFunds, RetryException)) + @retry((InsufficientFunds, RetryException, InvalidNonce)) def run(self): self.tries += 1 symbol = "{}/{}".format(self.amount.currency, self.base_currency) @@ -496,6 +496,14 @@ class Order: self.status = "closed" self.mark_finished_order() return + except InvalidNonce as e: + if self.tries < 5: + self.market.report.log_error(action, message="Retrying after invalid nonce", exception=e) + raise e + else: + self.market.report.log_error(action, message="Giving up {} after invalid nonce".format(self), exception=e) + self.status = "error" + return except RequestTimeout as e: if not self.retrieve_order(): if self.tries < 5: diff --git a/test.py b/test.py index 5b9e2b4..c229801 100644 --- a/test.py +++ b/test.py @@ -101,7 +101,7 @@ class poloniexETest(unittest.TestCase): retry_call.assert_called_with(request, delay=1, tries=10, fargs=["foo"], fkwargs={'api': 'public', 'method': 'GET', 'params': {}, 'headers': None, 'body': None}, - exceptions=(market.ccxt.RequestTimeout,)) + exceptions=(market.ccxt.RequestTimeout, market.ccxt.InvalidNonce)) request.assert_not_called() with self.subTest(desc="private GET"): @@ -109,7 +109,7 @@ class poloniexETest(unittest.TestCase): retry_call.assert_called_with(request, delay=1, tries=10, fargs=["foo"], fkwargs={'api': 'private', 'method': 'GET', 'params': {}, 'headers': None, 'body': None}, - exceptions=(market.ccxt.RequestTimeout,)) + exceptions=(market.ccxt.RequestTimeout, market.ccxt.InvalidNonce)) request.assert_not_called() with self.subTest(desc="private POST regexp"): @@ -117,7 +117,7 @@ class poloniexETest(unittest.TestCase): retry_call.assert_called_with(request, delay=1, tries=10, fargs=["returnFoo"], fkwargs={'api': 'private', 'method': 'POST', 'params': {}, 'headers': None, 'body': None}, - exceptions=(market.ccxt.RequestTimeout,)) + exceptions=(market.ccxt.RequestTimeout, market.ccxt.InvalidNonce)) request.assert_not_called() with self.subTest(desc="private POST non-regexp"): @@ -125,7 +125,7 @@ class poloniexETest(unittest.TestCase): retry_call.assert_called_with(request, delay=1, tries=10, fargs=["getMarginPosition"], fkwargs={'api': 'private', 'method': 'POST', 'params': {}, 'headers': None, 'body': None}, - exceptions=(market.ccxt.RequestTimeout,)) + exceptions=(market.ccxt.RequestTimeout, market.ccxt.InvalidNonce)) request.assert_not_called() retry_call.reset_mock() request.reset_mock() @@ -1461,16 +1461,18 @@ class MarketTest(WebMockTestCase): m.ccxt.transfer_balance.side_effect = [ market.ccxt.RequestTimeout, + market.ccxt.InvalidNonce, True ] m.move_balances() self.ccxt.transfer_balance.assert_has_calls([ + mock.call("BTC", 3, "exchange", "margin"), mock.call("BTC", 3, "exchange", "margin"), mock.call("BTC", 3, "exchange", "margin") ]) - self.assertEqual(2, fetch_balances.call_count) + self.assertEqual(3, fetch_balances.call_count) m.report.log_error.assert_called_with(mock.ANY, message="Retrying", exception=mock.ANY) - self.assertEqual(2, m.report.log_move_balances.call_count) + self.assertEqual(3, m.report.log_move_balances.call_count) self.ccxt.transfer_balance.reset_mock() m.report.reset_mock() @@ -3213,6 +3215,48 @@ class OrderTest(WebMockTestCase): self.assertEqual(5, self.m.report.log_error.call_count) self.m.report.log_error.assert_called_with(mock.ANY, message="Giving up Order(buy long 0.00096060 ETH at 0.1 BTC [pending])", exception=mock.ANY) + self.m.reset_mock() + with self.subTest(invalid_nonce=True): + with self.subTest(retry_success=True): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + self.m.ccxt.create_order.side_effect = [ + portfolio.InvalidNonce, + portfolio.InvalidNonce, + { "id": 123 }, + ] + order.run() + self.m.ccxt.create_order.assert_has_calls([ + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + mock.call('ETH/BTC', 'limit', 'buy', D('0.0010'), account='exchange', price=D('0.1')), + ]) + self.assertEqual(3, self.m.ccxt.create_order.call_count) + self.assertEqual(3, order.tries) + self.m.report.log_error.assert_called() + self.assertEqual(2, self.m.report.log_error.call_count) + self.m.report.log_error.assert_called_with(mock.ANY, message="Retrying after invalid nonce", exception=mock.ANY) + self.assertEqual(123, order.id) + + self.m.reset_mock() + with self.subTest(retry_success=False): + order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), + D("0.1"), "BTC", "long", self.m, "trade") + self.m.ccxt.create_order.side_effect = [ + portfolio.InvalidNonce, + portfolio.InvalidNonce, + portfolio.InvalidNonce, + portfolio.InvalidNonce, + portfolio.InvalidNonce, + ] + order.run() + self.assertEqual(5, self.m.ccxt.create_order.call_count) + self.assertEqual(5, order.tries) + self.m.report.log_error.assert_called() + self.assertEqual(5, self.m.report.log_error.call_count) + self.m.report.log_error.assert_called_with(mock.ANY, message="Giving up Order(buy long 0.00100000 ETH at 0.1 BTC [pending]) after invalid nonce", exception=mock.ANY) + self.assertEqual("error", order.status) + self.m.reset_mock() with self.subTest(request_timeout=True): order = portfolio.Order("buy", portfolio.Amount("ETH", "0.001"), -- 2.41.0 From 915980607f7325b0759bd508d21aa1467a590392 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Tue, 27 Mar 2018 21:30:41 +0200 Subject: [PATCH 15/16] Add conditional install with virtual env --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index dfbe3b4..534bf88 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,9 @@ install: +ifeq (${VIRTUAL_ENV},) pip install --user -r requirements.txt +else + pip install -r requirements.txt +endif test: python test.py -- 2.41.0 From b03f2a307dfdaafc1597035ef669b14da0d55f77 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Isma=C3=ABl=20Bouya?= Date: Mon, 2 Apr 2018 13:40:14 +0200 Subject: [PATCH 16/16] Small refactor for `super` use --- ccxt_wrapper.py | 6 +++--- test.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ccxt_wrapper.py b/ccxt_wrapper.py index 97f359f..260d49d 100644 --- a/ccxt_wrapper.py +++ b/ccxt_wrapper.py @@ -21,7 +21,7 @@ class poloniexE(poloniex): Wrapped to allow retry of non-posting requests" """ - origin_request = super(poloniexE, self).request + origin_request = super().request kwargs = { "api": api, "method": method, @@ -38,7 +38,7 @@ class poloniexE(poloniex): return origin_request(path, **kwargs) def __init__(self, *args, **kwargs): - super(poloniexE, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) # For requests logging self.session.origin_request = self.session.request @@ -223,7 +223,7 @@ class poloniexE(poloniex): return all_balances def create_exchange_order(self, symbol, type, side, amount, price=None, params={}): - return super(poloniexE, self).create_order(symbol, type, side, amount, price=price, params=params) + return super().create_order(symbol, type, side, amount, price=price, params=params) def create_margin_order(self, symbol, type, side, amount, price=None, lending_rate=None, params={}): if type == 'market': diff --git a/test.py b/test.py index c229801..9ea9df4 100644 --- a/test.py +++ b/test.py @@ -27,7 +27,7 @@ class WebMockTestCase(unittest.TestCase): return type('Args', (object,), { "debug": debug, "quiet": quiet })() def setUp(self): - super(WebMockTestCase, self).setUp() + super().setUp() self.wm = requests_mock.Mocker() self.wm.start() @@ -55,12 +55,12 @@ class WebMockTestCase(unittest.TestCase): for patcher in self.patchers: patcher.stop() self.wm.stop() - super(WebMockTestCase, self).tearDown() + super().tearDown() @unittest.skipUnless("unit" in limits, "Unit skipped") class poloniexETest(unittest.TestCase): def setUp(self): - super(poloniexETest, self).setUp() + super().setUp() self.wm = requests_mock.Mocker() self.wm.start() @@ -68,7 +68,7 @@ class poloniexETest(unittest.TestCase): def tearDown(self): self.wm.stop() - super(poloniexETest, self).tearDown() + super().tearDown() def test__init(self): with mock.patch("market.ccxt.poloniexE.session") as session: @@ -609,7 +609,7 @@ class LockedVar(unittest.TestCase): @unittest.skipUnless("unit" in limits, "Unit skipped") class PortfolioTest(WebMockTestCase): def setUp(self): - super(PortfolioTest, self).setUp() + super().setUp() with open("test_samples/test_portfolio.json") as example: self.json_response = example.read() @@ -1154,7 +1154,7 @@ class BalanceTest(WebMockTestCase): @unittest.skipUnless("unit" in limits, "Unit skipped") class MarketTest(WebMockTestCase): def setUp(self): - super(MarketTest, self).setUp() + super().setUp() self.ccxt = mock.Mock(spec=market.ccxt.poloniexE) @@ -2056,7 +2056,7 @@ class TradeStoreTest(WebMockTestCase): @unittest.skipUnless("unit" in limits, "Unit skipped") class BalanceStoreTest(WebMockTestCase): def setUp(self): - super(BalanceStoreTest, self).setUp() + super().setUp() self.fetch_balance = { "ETC": { -- 2.41.0