]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/commitdiff
Add increasing delay to fetching cryptoportfolio
authorIsmaël Bouya <ismael.bouya@normalesup.org>
Sun, 26 Aug 2018 20:22:03 +0000 (22:22 +0200)
committerIsmaël Bouya <ismael.bouya@normalesup.org>
Sun, 26 Aug 2018 20:22:03 +0000 (22:22 +0200)
store.py
tests/helper.py
tests/test_market.py
tests/test_store.py

index 5533759a2f766159b6d098f77bffd7fa4eedd34f..f987bb9c036b9d6b3bbf63878a43123168514323 100644 (file)
--- a/store.py
+++ b/store.py
@@ -499,13 +499,32 @@ class Portfolio:
     worker_started = False
     worker_notify = None
     callback = None
+    poll_started_at = None
 
     @classmethod
-    def start_worker(cls, poll=30):
+    def next_wait_time(cls):
+        now = datetime.datetime.now()
+        if cls.poll_started_at is None:
+            cls.poll_started_at = now
+        delta = now - cls.poll_started_at
+
+        if delta < datetime.timedelta(minutes=30):
+            return 30
+        elif delta < datetime.timedelta(hours=1):
+            return 60
+        elif delta < datetime.timedelta(hours=4):
+            return 5*60
+        elif delta < datetime.timedelta(days=1):
+            return 60*60
+        else:
+            raise "Too long waiting"
+
+    @classmethod
+    def start_worker(cls):
         import threading
 
         cls.worker = threading.Thread(name="portfolio", daemon=True,
-                target=cls.wait_for_notification, kwargs={"poll": poll})
+                target=cls.wait_for_notification)
         cls.worker_notify = threading.Event()
         cls.callback = threading.Event()
 
@@ -526,7 +545,7 @@ class Portfolio:
             return cls.worker == threading.current_thread()
 
     @classmethod
-    def wait_for_notification(cls, poll=30):
+    def wait_for_notification(cls):
         if not cls.is_worker_thread():
             raise RuntimeError("This method needs to be ran with the worker")
         while cls.worker_started:
@@ -536,7 +555,7 @@ class Portfolio:
                 cls.report.print_log("[Worker] Fetching cryptoportfolio")
                 cls.get_cryptoportfolio(refetch=True)
                 cls.callback.set()
-                time.sleep(poll)
+                time.sleep(cls.next_wait_time())
 
     @classmethod
     def stop_worker(cls):
@@ -550,11 +569,11 @@ class Portfolio:
         cls.callback.wait()
 
     @classmethod
-    def wait_for_recent(cls, delta=4, poll=30):
+    def wait_for_recent(cls, delta=4):
         cls.get_cryptoportfolio()
         while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
             if cls.worker is None:
-                time.sleep(poll)
+                time.sleep(cls.next_wait_time())
                 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
             cls.get_cryptoportfolio(refetch=True)
 
@@ -585,7 +604,7 @@ class Portfolio:
             cls.data.set(r.json(parse_int=D, parse_float=D))
             cls.parse_cryptoportfolio()
             cls.store_cryptoportfolio()
-        except (JSONDecodeError, SimpleJSONDecodeError):
+        except (AssertionError, JSONDecodeError, SimpleJSONDecodeError):
             cls.data.set(None)
             cls.last_date.set(None)
             cls.liquidities.set({})
@@ -648,6 +667,8 @@ class Portfolio:
         high_liquidity = parse_weights(cls.data.get("portfolio_1"))
         medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
 
+        assert len(high_liquidity) > 0
+        assert len(medium_liquidity) > 0
         cls.liquidities.set({
             "medium": medium_liquidity,
             "high":   high_liquidity,
index 935e0601f1ad9bbd82433ea841fd89474f1ad2c5..a2f8a2243c5e6966a96ccda794741291970d201a 100644 (file)
@@ -45,6 +45,7 @@ class WebMockTestCase(unittest.TestCase):
                     worker_tag="",
                     worker_notify=None,
                     worker_started=False,
+                    poll_started_at=None,
                     callback=None),
                 mock.patch.multiple(portfolio.Computation,
                     computations=portfolio.Computation.computations),
index 49d159c3a3d1eae6e57d5ae67b05681ddd123ead..e6e6f36f9459468dbcb9de4583cd3108f3c3d0d6 100644 (file)
@@ -1033,7 +1033,7 @@ class ProcessorTest(WebMockTestCase):
 
         method, arguments = processor.method_arguments("wait_for_recent")
         self.assertEqual(market.Portfolio.wait_for_recent, method)
-        self.assertEqual(["delta", "poll"], arguments)
+        self.assertEqual(["delta"], arguments)
 
         method, arguments = processor.method_arguments("prepare_trades")
         self.assertEqual(m.prepare_trades, method)
index 3097f6d5d49d30f97bea413c3138cac6c21683ce..4ab9bdf2f1587b199b146fbf203edd6a6bde9246 100644 (file)
@@ -1430,22 +1430,8 @@ class PortfolioTest(WebMockTestCase):
             del(data["portfolio_2"]["weights"])
             market.Portfolio.data = store.LockedVar(data)
 
-            market.Portfolio.parse_cryptoportfolio()
-            self.assertListEqual(
-                    ["medium", "high"],
-                    list(market.Portfolio.liquidities.get().keys()))
-            self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
-
-        with self.subTest(description="All missing weights"):
-            data = store.json.loads(self.json_response, parse_int=D, parse_float=D)
-            del(data["portfolio_1"]["weights"])
-            del(data["portfolio_2"]["weights"])
-            market.Portfolio.data = store.LockedVar(data)
-
-            market.Portfolio.parse_cryptoportfolio()
-            self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
-            self.assertEqual({}, market.Portfolio.liquidities.get("high"))
-            self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get())
+            with self.assertRaises(AssertionError):
+                market.Portfolio.parse_cryptoportfolio()
 
     @mock.patch.object(store.dbs, "redis_connected")
     @mock.patch.object(store.dbs, "redis")
@@ -1551,7 +1537,8 @@ class PortfolioTest(WebMockTestCase):
 
     @mock.patch.object(market.time, "sleep")
     @mock.patch.object(market.Portfolio, "get_cryptoportfolio")
-    def test_wait_for_recent(self, get_cryptoportfolio, sleep):
+    @mock.patch.object(market.Portfolio, "next_wait_time")
+    def test_wait_for_recent(self, next_wait_time, get_cryptoportfolio, sleep):
         self.call_count = 0
         def _get(refetch=False):
             if self.call_count != 0:
@@ -1563,6 +1550,7 @@ class PortfolioTest(WebMockTestCase):
                 - store.datetime.timedelta(10)\
                 + store.datetime.timedelta(self.call_count))
         get_cryptoportfolio.side_effect = _get
+        next_wait_time.return_value = 30
 
         market.Portfolio.wait_for_recent()
         sleep.assert_called_with(30)
@@ -1608,7 +1596,7 @@ class PortfolioTest(WebMockTestCase):
     def test_start_worker(self):
         with mock.patch.object(store.Portfolio, "wait_for_notification") as notification:
             store.Portfolio.start_worker()
-            notification.assert_called_once_with(poll=30)
+            notification.assert_called_once_with()
 
             self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__)
             self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__)
@@ -1626,7 +1614,7 @@ class PortfolioTest(WebMockTestCase):
         with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
                 mock.patch.object(store.Portfolio, "report") as report,\
                 mock.patch.object(store.time, "sleep") as sleep:
-            store.Portfolio.start_worker(poll=3)
+            store.Portfolio.start_worker()
             store.Portfolio.stop_worker()
             store.Portfolio.worker.join()
             get.assert_not_called()
@@ -1640,8 +1628,10 @@ class PortfolioTest(WebMockTestCase):
 
         with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
                 mock.patch.object(store.Portfolio, "report") as report,\
+                mock.patch.object(store.Portfolio, "next_wait_time") as wait,\
                 mock.patch.object(store.time, "sleep") as sleep:
-            store.Portfolio.start_worker(poll=3)
+            wait.return_value = 3
+            store.Portfolio.start_worker()
 
             store.Portfolio.worker_notify.set()
 
@@ -1667,4 +1657,24 @@ class PortfolioTest(WebMockTestCase):
             worker_notify.set.assert_called_once_with()
             callback.wait.assert_called_once_with()
 
+    def test_next_wait_time(self):
+        with self.subTest("first start"):
+            self.assertEqual(30, store.Portfolio.next_wait_time())
+            self.assertIsNotNone(store.Portfolio.poll_started_at)
+        with self.subTest("25min"):
+            store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=25)
+            self.assertEqual(30, store.Portfolio.next_wait_time())
+        with self.subTest("35min"):
+            store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=35)
+            self.assertEqual(60, store.Portfolio.next_wait_time())
+        with self.subTest("1h15"):
+            store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=75)
+            self.assertEqual(300, store.Portfolio.next_wait_time())
+        with self.subTest("5hours"):
+            store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=5)
+            self.assertEqual(3600, store.Portfolio.next_wait_time())
+        with self.subTest("overdue"), self.assertRaises(Exception):
+            store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=25)
+            store.Portfolio.next_wait_time()
+