diff options
author | Ismaël Bouya <ismael.bouya@normalesup.org> | 2018-08-26 22:22:03 +0200 |
---|---|---|
committer | Ismaël Bouya <ismael.bouya@normalesup.org> | 2018-08-26 22:22:03 +0200 |
commit | 8f98e46aa52f4e229ae4f44cd9cc49f9fad9c866 (patch) | |
tree | f512e943a84127881c41b498a3ce4d5d0ca0cdb4 | |
parent | 445185355d0d13d8d7251fa9add8e249c5361aa7 (diff) | |
download | Trader-8f98e46aa52f4e229ae4f44cd9cc49f9fad9c866.tar.gz Trader-8f98e46aa52f4e229ae4f44cd9cc49f9fad9c866.tar.zst Trader-8f98e46aa52f4e229ae4f44cd9cc49f9fad9c866.zip |
Add increasing delay to fetching cryptoportfolio
-rw-r--r-- | store.py | 35 | ||||
-rw-r--r-- | tests/helper.py | 1 | ||||
-rw-r--r-- | tests/test_market.py | 2 | ||||
-rw-r--r-- | tests/test_store.py | 50 |
4 files changed, 60 insertions, 28 deletions
@@ -499,13 +499,32 @@ class Portfolio: | |||
499 | worker_started = False | 499 | worker_started = False |
500 | worker_notify = None | 500 | worker_notify = None |
501 | callback = None | 501 | callback = None |
502 | poll_started_at = None | ||
502 | 503 | ||
503 | @classmethod | 504 | @classmethod |
504 | def start_worker(cls, poll=30): | 505 | def next_wait_time(cls): |
506 | now = datetime.datetime.now() | ||
507 | if cls.poll_started_at is None: | ||
508 | cls.poll_started_at = now | ||
509 | delta = now - cls.poll_started_at | ||
510 | |||
511 | if delta < datetime.timedelta(minutes=30): | ||
512 | return 30 | ||
513 | elif delta < datetime.timedelta(hours=1): | ||
514 | return 60 | ||
515 | elif delta < datetime.timedelta(hours=4): | ||
516 | return 5*60 | ||
517 | elif delta < datetime.timedelta(days=1): | ||
518 | return 60*60 | ||
519 | else: | ||
520 | raise "Too long waiting" | ||
521 | |||
522 | @classmethod | ||
523 | def start_worker(cls): | ||
505 | import threading | 524 | import threading |
506 | 525 | ||
507 | cls.worker = threading.Thread(name="portfolio", daemon=True, | 526 | cls.worker = threading.Thread(name="portfolio", daemon=True, |
508 | target=cls.wait_for_notification, kwargs={"poll": poll}) | 527 | target=cls.wait_for_notification) |
509 | cls.worker_notify = threading.Event() | 528 | cls.worker_notify = threading.Event() |
510 | cls.callback = threading.Event() | 529 | cls.callback = threading.Event() |
511 | 530 | ||
@@ -526,7 +545,7 @@ class Portfolio: | |||
526 | return cls.worker == threading.current_thread() | 545 | return cls.worker == threading.current_thread() |
527 | 546 | ||
528 | @classmethod | 547 | @classmethod |
529 | def wait_for_notification(cls, poll=30): | 548 | def wait_for_notification(cls): |
530 | if not cls.is_worker_thread(): | 549 | if not cls.is_worker_thread(): |
531 | raise RuntimeError("This method needs to be ran with the worker") | 550 | raise RuntimeError("This method needs to be ran with the worker") |
532 | while cls.worker_started: | 551 | while cls.worker_started: |
@@ -536,7 +555,7 @@ class Portfolio: | |||
536 | cls.report.print_log("[Worker] Fetching cryptoportfolio") | 555 | cls.report.print_log("[Worker] Fetching cryptoportfolio") |
537 | cls.get_cryptoportfolio(refetch=True) | 556 | cls.get_cryptoportfolio(refetch=True) |
538 | cls.callback.set() | 557 | cls.callback.set() |
539 | time.sleep(poll) | 558 | time.sleep(cls.next_wait_time()) |
540 | 559 | ||
541 | @classmethod | 560 | @classmethod |
542 | def stop_worker(cls): | 561 | def stop_worker(cls): |
@@ -550,11 +569,11 @@ class Portfolio: | |||
550 | cls.callback.wait() | 569 | cls.callback.wait() |
551 | 570 | ||
552 | @classmethod | 571 | @classmethod |
553 | def wait_for_recent(cls, delta=4, poll=30): | 572 | def wait_for_recent(cls, delta=4): |
554 | cls.get_cryptoportfolio() | 573 | cls.get_cryptoportfolio() |
555 | while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): | 574 | while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): |
556 | if cls.worker is None: | 575 | if cls.worker is None: |
557 | time.sleep(poll) | 576 | time.sleep(cls.next_wait_time()) |
558 | cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") | 577 | cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") |
559 | cls.get_cryptoportfolio(refetch=True) | 578 | cls.get_cryptoportfolio(refetch=True) |
560 | 579 | ||
@@ -585,7 +604,7 @@ class Portfolio: | |||
585 | cls.data.set(r.json(parse_int=D, parse_float=D)) | 604 | cls.data.set(r.json(parse_int=D, parse_float=D)) |
586 | cls.parse_cryptoportfolio() | 605 | cls.parse_cryptoportfolio() |
587 | cls.store_cryptoportfolio() | 606 | cls.store_cryptoportfolio() |
588 | except (JSONDecodeError, SimpleJSONDecodeError): | 607 | except (AssertionError, JSONDecodeError, SimpleJSONDecodeError): |
589 | cls.data.set(None) | 608 | cls.data.set(None) |
590 | cls.last_date.set(None) | 609 | cls.last_date.set(None) |
591 | cls.liquidities.set({}) | 610 | cls.liquidities.set({}) |
@@ -648,6 +667,8 @@ class Portfolio: | |||
648 | high_liquidity = parse_weights(cls.data.get("portfolio_1")) | 667 | high_liquidity = parse_weights(cls.data.get("portfolio_1")) |
649 | medium_liquidity = parse_weights(cls.data.get("portfolio_2")) | 668 | medium_liquidity = parse_weights(cls.data.get("portfolio_2")) |
650 | 669 | ||
670 | assert len(high_liquidity) > 0 | ||
671 | assert len(medium_liquidity) > 0 | ||
651 | cls.liquidities.set({ | 672 | cls.liquidities.set({ |
652 | "medium": medium_liquidity, | 673 | "medium": medium_liquidity, |
653 | "high": high_liquidity, | 674 | "high": high_liquidity, |
diff --git a/tests/helper.py b/tests/helper.py index 935e060..a2f8a22 100644 --- a/tests/helper.py +++ b/tests/helper.py | |||
@@ -45,6 +45,7 @@ class WebMockTestCase(unittest.TestCase): | |||
45 | worker_tag="", | 45 | worker_tag="", |
46 | worker_notify=None, | 46 | worker_notify=None, |
47 | worker_started=False, | 47 | worker_started=False, |
48 | poll_started_at=None, | ||
48 | callback=None), | 49 | callback=None), |
49 | mock.patch.multiple(portfolio.Computation, | 50 | mock.patch.multiple(portfolio.Computation, |
50 | computations=portfolio.Computation.computations), | 51 | computations=portfolio.Computation.computations), |
diff --git a/tests/test_market.py b/tests/test_market.py index 49d159c..e6e6f36 100644 --- a/tests/test_market.py +++ b/tests/test_market.py | |||
@@ -1033,7 +1033,7 @@ class ProcessorTest(WebMockTestCase): | |||
1033 | 1033 | ||
1034 | method, arguments = processor.method_arguments("wait_for_recent") | 1034 | method, arguments = processor.method_arguments("wait_for_recent") |
1035 | self.assertEqual(market.Portfolio.wait_for_recent, method) | 1035 | self.assertEqual(market.Portfolio.wait_for_recent, method) |
1036 | self.assertEqual(["delta", "poll"], arguments) | 1036 | self.assertEqual(["delta"], arguments) |
1037 | 1037 | ||
1038 | method, arguments = processor.method_arguments("prepare_trades") | 1038 | method, arguments = processor.method_arguments("prepare_trades") |
1039 | self.assertEqual(m.prepare_trades, method) | 1039 | self.assertEqual(m.prepare_trades, method) |
diff --git a/tests/test_store.py b/tests/test_store.py index 3097f6d..4ab9bdf 100644 --- a/tests/test_store.py +++ b/tests/test_store.py | |||
@@ -1430,22 +1430,8 @@ class PortfolioTest(WebMockTestCase): | |||
1430 | del(data["portfolio_2"]["weights"]) | 1430 | del(data["portfolio_2"]["weights"]) |
1431 | market.Portfolio.data = store.LockedVar(data) | 1431 | market.Portfolio.data = store.LockedVar(data) |
1432 | 1432 | ||
1433 | market.Portfolio.parse_cryptoportfolio() | 1433 | with self.assertRaises(AssertionError): |
1434 | self.assertListEqual( | 1434 | market.Portfolio.parse_cryptoportfolio() |
1435 | ["medium", "high"], | ||
1436 | list(market.Portfolio.liquidities.get().keys())) | ||
1437 | self.assertEqual({}, market.Portfolio.liquidities.get("medium")) | ||
1438 | |||
1439 | with self.subTest(description="All missing weights"): | ||
1440 | data = store.json.loads(self.json_response, parse_int=D, parse_float=D) | ||
1441 | del(data["portfolio_1"]["weights"]) | ||
1442 | del(data["portfolio_2"]["weights"]) | ||
1443 | market.Portfolio.data = store.LockedVar(data) | ||
1444 | |||
1445 | market.Portfolio.parse_cryptoportfolio() | ||
1446 | self.assertEqual({}, market.Portfolio.liquidities.get("medium")) | ||
1447 | self.assertEqual({}, market.Portfolio.liquidities.get("high")) | ||
1448 | self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get()) | ||
1449 | 1435 | ||
1450 | @mock.patch.object(store.dbs, "redis_connected") | 1436 | @mock.patch.object(store.dbs, "redis_connected") |
1451 | @mock.patch.object(store.dbs, "redis") | 1437 | @mock.patch.object(store.dbs, "redis") |
@@ -1551,7 +1537,8 @@ class PortfolioTest(WebMockTestCase): | |||
1551 | 1537 | ||
1552 | @mock.patch.object(market.time, "sleep") | 1538 | @mock.patch.object(market.time, "sleep") |
1553 | @mock.patch.object(market.Portfolio, "get_cryptoportfolio") | 1539 | @mock.patch.object(market.Portfolio, "get_cryptoportfolio") |
1554 | def test_wait_for_recent(self, get_cryptoportfolio, sleep): | 1540 | @mock.patch.object(market.Portfolio, "next_wait_time") |
1541 | def test_wait_for_recent(self, next_wait_time, get_cryptoportfolio, sleep): | ||
1555 | self.call_count = 0 | 1542 | self.call_count = 0 |
1556 | def _get(refetch=False): | 1543 | def _get(refetch=False): |
1557 | if self.call_count != 0: | 1544 | if self.call_count != 0: |
@@ -1563,6 +1550,7 @@ class PortfolioTest(WebMockTestCase): | |||
1563 | - store.datetime.timedelta(10)\ | 1550 | - store.datetime.timedelta(10)\ |
1564 | + store.datetime.timedelta(self.call_count)) | 1551 | + store.datetime.timedelta(self.call_count)) |
1565 | get_cryptoportfolio.side_effect = _get | 1552 | get_cryptoportfolio.side_effect = _get |
1553 | next_wait_time.return_value = 30 | ||
1566 | 1554 | ||
1567 | market.Portfolio.wait_for_recent() | 1555 | market.Portfolio.wait_for_recent() |
1568 | sleep.assert_called_with(30) | 1556 | sleep.assert_called_with(30) |
@@ -1608,7 +1596,7 @@ class PortfolioTest(WebMockTestCase): | |||
1608 | def test_start_worker(self): | 1596 | def test_start_worker(self): |
1609 | with mock.patch.object(store.Portfolio, "wait_for_notification") as notification: | 1597 | with mock.patch.object(store.Portfolio, "wait_for_notification") as notification: |
1610 | store.Portfolio.start_worker() | 1598 | store.Portfolio.start_worker() |
1611 | notification.assert_called_once_with(poll=30) | 1599 | notification.assert_called_once_with() |
1612 | 1600 | ||
1613 | self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__) | 1601 | self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__) |
1614 | self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__) | 1602 | self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__) |
@@ -1626,7 +1614,7 @@ class PortfolioTest(WebMockTestCase): | |||
1626 | with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ | 1614 | with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ |
1627 | mock.patch.object(store.Portfolio, "report") as report,\ | 1615 | mock.patch.object(store.Portfolio, "report") as report,\ |
1628 | mock.patch.object(store.time, "sleep") as sleep: | 1616 | mock.patch.object(store.time, "sleep") as sleep: |
1629 | store.Portfolio.start_worker(poll=3) | 1617 | store.Portfolio.start_worker() |
1630 | store.Portfolio.stop_worker() | 1618 | store.Portfolio.stop_worker() |
1631 | store.Portfolio.worker.join() | 1619 | store.Portfolio.worker.join() |
1632 | get.assert_not_called() | 1620 | get.assert_not_called() |
@@ -1640,8 +1628,10 @@ class PortfolioTest(WebMockTestCase): | |||
1640 | 1628 | ||
1641 | with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ | 1629 | with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ |
1642 | mock.patch.object(store.Portfolio, "report") as report,\ | 1630 | mock.patch.object(store.Portfolio, "report") as report,\ |
1631 | mock.patch.object(store.Portfolio, "next_wait_time") as wait,\ | ||
1643 | mock.patch.object(store.time, "sleep") as sleep: | 1632 | mock.patch.object(store.time, "sleep") as sleep: |
1644 | store.Portfolio.start_worker(poll=3) | 1633 | wait.return_value = 3 |
1634 | store.Portfolio.start_worker() | ||
1645 | 1635 | ||
1646 | store.Portfolio.worker_notify.set() | 1636 | store.Portfolio.worker_notify.set() |
1647 | 1637 | ||
@@ -1667,4 +1657,24 @@ class PortfolioTest(WebMockTestCase): | |||
1667 | worker_notify.set.assert_called_once_with() | 1657 | worker_notify.set.assert_called_once_with() |
1668 | callback.wait.assert_called_once_with() | 1658 | callback.wait.assert_called_once_with() |
1669 | 1659 | ||
1660 | def test_next_wait_time(self): | ||
1661 | with self.subTest("first start"): | ||
1662 | self.assertEqual(30, store.Portfolio.next_wait_time()) | ||
1663 | self.assertIsNotNone(store.Portfolio.poll_started_at) | ||
1664 | with self.subTest("25min"): | ||
1665 | store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=25) | ||
1666 | self.assertEqual(30, store.Portfolio.next_wait_time()) | ||
1667 | with self.subTest("35min"): | ||
1668 | store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=35) | ||
1669 | self.assertEqual(60, store.Portfolio.next_wait_time()) | ||
1670 | with self.subTest("1h15"): | ||
1671 | store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=75) | ||
1672 | self.assertEqual(300, store.Portfolio.next_wait_time()) | ||
1673 | with self.subTest("5hours"): | ||
1674 | store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=5) | ||
1675 | self.assertEqual(3600, store.Portfolio.next_wait_time()) | ||
1676 | with self.subTest("overdue"), self.assertRaises(Exception): | ||
1677 | store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=25) | ||
1678 | store.Portfolio.next_wait_time() | ||
1679 | |||
1670 | 1680 | ||