aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--store.py35
-rw-r--r--tests/helper.py1
-rw-r--r--tests/test_market.py2
-rw-r--r--tests/test_store.py50
4 files changed, 60 insertions, 28 deletions
diff --git a/store.py b/store.py
index 5533759..f987bb9 100644
--- a/store.py
+++ b/store.py
@@ -499,13 +499,32 @@ class Portfolio:
499 worker_started = False 499 worker_started = False
500 worker_notify = None 500 worker_notify = None
501 callback = None 501 callback = None
502 poll_started_at = None
502 503
503 @classmethod 504 @classmethod
504 def start_worker(cls, poll=30): 505 def next_wait_time(cls):
506 now = datetime.datetime.now()
507 if cls.poll_started_at is None:
508 cls.poll_started_at = now
509 delta = now - cls.poll_started_at
510
511 if delta < datetime.timedelta(minutes=30):
512 return 30
513 elif delta < datetime.timedelta(hours=1):
514 return 60
515 elif delta < datetime.timedelta(hours=4):
516 return 5*60
517 elif delta < datetime.timedelta(days=1):
518 return 60*60
519 else:
520 raise "Too long waiting"
521
522 @classmethod
523 def start_worker(cls):
505 import threading 524 import threading
506 525
507 cls.worker = threading.Thread(name="portfolio", daemon=True, 526 cls.worker = threading.Thread(name="portfolio", daemon=True,
508 target=cls.wait_for_notification, kwargs={"poll": poll}) 527 target=cls.wait_for_notification)
509 cls.worker_notify = threading.Event() 528 cls.worker_notify = threading.Event()
510 cls.callback = threading.Event() 529 cls.callback = threading.Event()
511 530
@@ -526,7 +545,7 @@ class Portfolio:
526 return cls.worker == threading.current_thread() 545 return cls.worker == threading.current_thread()
527 546
528 @classmethod 547 @classmethod
529 def wait_for_notification(cls, poll=30): 548 def wait_for_notification(cls):
530 if not cls.is_worker_thread(): 549 if not cls.is_worker_thread():
531 raise RuntimeError("This method needs to be ran with the worker") 550 raise RuntimeError("This method needs to be ran with the worker")
532 while cls.worker_started: 551 while cls.worker_started:
@@ -536,7 +555,7 @@ class Portfolio:
536 cls.report.print_log("[Worker] Fetching cryptoportfolio") 555 cls.report.print_log("[Worker] Fetching cryptoportfolio")
537 cls.get_cryptoportfolio(refetch=True) 556 cls.get_cryptoportfolio(refetch=True)
538 cls.callback.set() 557 cls.callback.set()
539 time.sleep(poll) 558 time.sleep(cls.next_wait_time())
540 559
541 @classmethod 560 @classmethod
542 def stop_worker(cls): 561 def stop_worker(cls):
@@ -550,11 +569,11 @@ class Portfolio:
550 cls.callback.wait() 569 cls.callback.wait()
551 570
552 @classmethod 571 @classmethod
553 def wait_for_recent(cls, delta=4, poll=30): 572 def wait_for_recent(cls, delta=4):
554 cls.get_cryptoportfolio() 573 cls.get_cryptoportfolio()
555 while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): 574 while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
556 if cls.worker is None: 575 if cls.worker is None:
557 time.sleep(poll) 576 time.sleep(cls.next_wait_time())
558 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") 577 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
559 cls.get_cryptoportfolio(refetch=True) 578 cls.get_cryptoportfolio(refetch=True)
560 579
@@ -585,7 +604,7 @@ class Portfolio:
585 cls.data.set(r.json(parse_int=D, parse_float=D)) 604 cls.data.set(r.json(parse_int=D, parse_float=D))
586 cls.parse_cryptoportfolio() 605 cls.parse_cryptoportfolio()
587 cls.store_cryptoportfolio() 606 cls.store_cryptoportfolio()
588 except (JSONDecodeError, SimpleJSONDecodeError): 607 except (AssertionError, JSONDecodeError, SimpleJSONDecodeError):
589 cls.data.set(None) 608 cls.data.set(None)
590 cls.last_date.set(None) 609 cls.last_date.set(None)
591 cls.liquidities.set({}) 610 cls.liquidities.set({})
@@ -648,6 +667,8 @@ class Portfolio:
648 high_liquidity = parse_weights(cls.data.get("portfolio_1")) 667 high_liquidity = parse_weights(cls.data.get("portfolio_1"))
649 medium_liquidity = parse_weights(cls.data.get("portfolio_2")) 668 medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
650 669
670 assert len(high_liquidity) > 0
671 assert len(medium_liquidity) > 0
651 cls.liquidities.set({ 672 cls.liquidities.set({
652 "medium": medium_liquidity, 673 "medium": medium_liquidity,
653 "high": high_liquidity, 674 "high": high_liquidity,
diff --git a/tests/helper.py b/tests/helper.py
index 935e060..a2f8a22 100644
--- a/tests/helper.py
+++ b/tests/helper.py
@@ -45,6 +45,7 @@ class WebMockTestCase(unittest.TestCase):
45 worker_tag="", 45 worker_tag="",
46 worker_notify=None, 46 worker_notify=None,
47 worker_started=False, 47 worker_started=False,
48 poll_started_at=None,
48 callback=None), 49 callback=None),
49 mock.patch.multiple(portfolio.Computation, 50 mock.patch.multiple(portfolio.Computation,
50 computations=portfolio.Computation.computations), 51 computations=portfolio.Computation.computations),
diff --git a/tests/test_market.py b/tests/test_market.py
index 49d159c..e6e6f36 100644
--- a/tests/test_market.py
+++ b/tests/test_market.py
@@ -1033,7 +1033,7 @@ class ProcessorTest(WebMockTestCase):
1033 1033
1034 method, arguments = processor.method_arguments("wait_for_recent") 1034 method, arguments = processor.method_arguments("wait_for_recent")
1035 self.assertEqual(market.Portfolio.wait_for_recent, method) 1035 self.assertEqual(market.Portfolio.wait_for_recent, method)
1036 self.assertEqual(["delta", "poll"], arguments) 1036 self.assertEqual(["delta"], arguments)
1037 1037
1038 method, arguments = processor.method_arguments("prepare_trades") 1038 method, arguments = processor.method_arguments("prepare_trades")
1039 self.assertEqual(m.prepare_trades, method) 1039 self.assertEqual(m.prepare_trades, method)
diff --git a/tests/test_store.py b/tests/test_store.py
index 3097f6d..4ab9bdf 100644
--- a/tests/test_store.py
+++ b/tests/test_store.py
@@ -1430,22 +1430,8 @@ class PortfolioTest(WebMockTestCase):
1430 del(data["portfolio_2"]["weights"]) 1430 del(data["portfolio_2"]["weights"])
1431 market.Portfolio.data = store.LockedVar(data) 1431 market.Portfolio.data = store.LockedVar(data)
1432 1432
1433 market.Portfolio.parse_cryptoportfolio() 1433 with self.assertRaises(AssertionError):
1434 self.assertListEqual( 1434 market.Portfolio.parse_cryptoportfolio()
1435 ["medium", "high"],
1436 list(market.Portfolio.liquidities.get().keys()))
1437 self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
1438
1439 with self.subTest(description="All missing weights"):
1440 data = store.json.loads(self.json_response, parse_int=D, parse_float=D)
1441 del(data["portfolio_1"]["weights"])
1442 del(data["portfolio_2"]["weights"])
1443 market.Portfolio.data = store.LockedVar(data)
1444
1445 market.Portfolio.parse_cryptoportfolio()
1446 self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
1447 self.assertEqual({}, market.Portfolio.liquidities.get("high"))
1448 self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get())
1449 1435
1450 @mock.patch.object(store.dbs, "redis_connected") 1436 @mock.patch.object(store.dbs, "redis_connected")
1451 @mock.patch.object(store.dbs, "redis") 1437 @mock.patch.object(store.dbs, "redis")
@@ -1551,7 +1537,8 @@ class PortfolioTest(WebMockTestCase):
1551 1537
1552 @mock.patch.object(market.time, "sleep") 1538 @mock.patch.object(market.time, "sleep")
1553 @mock.patch.object(market.Portfolio, "get_cryptoportfolio") 1539 @mock.patch.object(market.Portfolio, "get_cryptoportfolio")
1554 def test_wait_for_recent(self, get_cryptoportfolio, sleep): 1540 @mock.patch.object(market.Portfolio, "next_wait_time")
1541 def test_wait_for_recent(self, next_wait_time, get_cryptoportfolio, sleep):
1555 self.call_count = 0 1542 self.call_count = 0
1556 def _get(refetch=False): 1543 def _get(refetch=False):
1557 if self.call_count != 0: 1544 if self.call_count != 0:
@@ -1563,6 +1550,7 @@ class PortfolioTest(WebMockTestCase):
1563 - store.datetime.timedelta(10)\ 1550 - store.datetime.timedelta(10)\
1564 + store.datetime.timedelta(self.call_count)) 1551 + store.datetime.timedelta(self.call_count))
1565 get_cryptoportfolio.side_effect = _get 1552 get_cryptoportfolio.side_effect = _get
1553 next_wait_time.return_value = 30
1566 1554
1567 market.Portfolio.wait_for_recent() 1555 market.Portfolio.wait_for_recent()
1568 sleep.assert_called_with(30) 1556 sleep.assert_called_with(30)
@@ -1608,7 +1596,7 @@ class PortfolioTest(WebMockTestCase):
1608 def test_start_worker(self): 1596 def test_start_worker(self):
1609 with mock.patch.object(store.Portfolio, "wait_for_notification") as notification: 1597 with mock.patch.object(store.Portfolio, "wait_for_notification") as notification:
1610 store.Portfolio.start_worker() 1598 store.Portfolio.start_worker()
1611 notification.assert_called_once_with(poll=30) 1599 notification.assert_called_once_with()
1612 1600
1613 self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__) 1601 self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__)
1614 self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__) 1602 self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__)
@@ -1626,7 +1614,7 @@ class PortfolioTest(WebMockTestCase):
1626 with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ 1614 with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
1627 mock.patch.object(store.Portfolio, "report") as report,\ 1615 mock.patch.object(store.Portfolio, "report") as report,\
1628 mock.patch.object(store.time, "sleep") as sleep: 1616 mock.patch.object(store.time, "sleep") as sleep:
1629 store.Portfolio.start_worker(poll=3) 1617 store.Portfolio.start_worker()
1630 store.Portfolio.stop_worker() 1618 store.Portfolio.stop_worker()
1631 store.Portfolio.worker.join() 1619 store.Portfolio.worker.join()
1632 get.assert_not_called() 1620 get.assert_not_called()
@@ -1640,8 +1628,10 @@ class PortfolioTest(WebMockTestCase):
1640 1628
1641 with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ 1629 with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
1642 mock.patch.object(store.Portfolio, "report") as report,\ 1630 mock.patch.object(store.Portfolio, "report") as report,\
1631 mock.patch.object(store.Portfolio, "next_wait_time") as wait,\
1643 mock.patch.object(store.time, "sleep") as sleep: 1632 mock.patch.object(store.time, "sleep") as sleep:
1644 store.Portfolio.start_worker(poll=3) 1633 wait.return_value = 3
1634 store.Portfolio.start_worker()
1645 1635
1646 store.Portfolio.worker_notify.set() 1636 store.Portfolio.worker_notify.set()
1647 1637
@@ -1667,4 +1657,24 @@ class PortfolioTest(WebMockTestCase):
1667 worker_notify.set.assert_called_once_with() 1657 worker_notify.set.assert_called_once_with()
1668 callback.wait.assert_called_once_with() 1658 callback.wait.assert_called_once_with()
1669 1659
1660 def test_next_wait_time(self):
1661 with self.subTest("first start"):
1662 self.assertEqual(30, store.Portfolio.next_wait_time())
1663 self.assertIsNotNone(store.Portfolio.poll_started_at)
1664 with self.subTest("25min"):
1665 store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=25)
1666 self.assertEqual(30, store.Portfolio.next_wait_time())
1667 with self.subTest("35min"):
1668 store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=35)
1669 self.assertEqual(60, store.Portfolio.next_wait_time())
1670 with self.subTest("1h15"):
1671 store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=75)
1672 self.assertEqual(300, store.Portfolio.next_wait_time())
1673 with self.subTest("5hours"):
1674 store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=5)
1675 self.assertEqual(3600, store.Portfolio.next_wait_time())
1676 with self.subTest("overdue"), self.assertRaises(Exception):
1677 store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=25)
1678 store.Portfolio.next_wait_time()
1679
1670 1680