]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blobdiff - store.py
Add parallelization
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / store.py
index c8cdc42b8b3a290974035869522f807b39c205d6..f655be538b66f3d81d7fb121a0595ee4ff63ddc7 100644 (file)
--- a/store.py
+++ b/store.py
@@ -309,30 +309,110 @@ class TradeStore:
         for order in self.all_orders(state="open"):
             order.get_status()
 
+class NoopLock:
+    def __enter__(self, *args):
+        pass
+    def __exit__(self, *args):
+        pass
+
+class LockedVar:
+    def __init__(self, value):
+        self.lock = NoopLock()
+        self.val = value
+
+    def start_lock(self):
+        import threading
+        self.lock = threading.Lock()
+
+    def set(self, value):
+        with self.lock:
+            self.val = value
+
+    def get(self, key=None):
+        with self.lock:
+            if key is not None and isinstance(self.val, dict):
+                return self.val.get(key)
+            else:
+                return self.val
+
+    def __getattr__(self, key):
+        with self.lock:
+            return getattr(self.val, key)
+
 class Portfolio:
     URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
-    liquidities = {}
-    data = None
-    last_date = None
-    report = ReportStore(None)
+    data = LockedVar(None)
+    liquidities = LockedVar({})
+    last_date = LockedVar(None)
+    report = LockedVar(ReportStore(None))
+    worker = None
+    worker_started = False
+    worker_notify = None
+    callback = None
+
+    @classmethod
+    def start_worker(cls, poll=30):
+        import threading
+
+        cls.worker = threading.Thread(name="portfolio", daemon=True,
+                target=cls.wait_for_notification, kwargs={"poll": poll})
+        cls.worker_notify = threading.Event()
+        cls.callback = threading.Event()
+
+        cls.last_date.start_lock()
+        cls.liquidities.start_lock()
+        cls.report.start_lock()
+
+        cls.worker_started = True
+        cls.worker.start()
+
+    @classmethod
+    def is_worker_thread(cls):
+        if cls.worker is None:
+            return False
+        else:
+            import threading
+            return cls.worker == threading.current_thread()
+
+    @classmethod
+    def wait_for_notification(cls, poll=30):
+        if not cls.is_worker_thread():
+            raise RuntimeError("This method needs to be ran with the worker")
+        while cls.worker_started:
+            cls.worker_notify.wait()
+            cls.worker_notify.clear()
+            cls.report.print_log("Fetching cryptoportfolio")
+            cls.get_cryptoportfolio(refetch=True)
+            cls.callback.set()
+            time.sleep(poll)
 
     @classmethod
-    def wait_for_recent(cls, delta=4):
+    def notify_and_wait(cls):
+        cls.callback.clear()
+        cls.worker_notify.set()
+        cls.callback.wait()
+
+    @classmethod
+    def wait_for_recent(cls, delta=4, poll=30):
         cls.get_cryptoportfolio()
-        while cls.last_date is None or datetime.now() - cls.last_date > timedelta(delta):
-            time.sleep(30)
-            cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
+        while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta):
+            if cls.worker is None:
+                time.sleep(poll)
+                cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
             cls.get_cryptoportfolio(refetch=True)
 
     @classmethod
     def repartition(cls, liquidity="medium"):
         cls.get_cryptoportfolio()
-        liquidities = cls.liquidities[liquidity]
-        return liquidities[cls.last_date]
+        liquidities = cls.liquidities.get(liquidity)
+        return liquidities[cls.last_date.get()]
 
     @classmethod
     def get_cryptoportfolio(cls, refetch=False):
-        if cls.data is not None and not refetch:
+        if cls.data.get() is not None and not refetch:
+            return
+        if cls.worker is not None and not cls.is_worker_thread():
+            cls.notify_and_wait()
             return
         try:
             r = requests.get(cls.URL)
@@ -342,11 +422,12 @@ class Portfolio:
             cls.report.log_error("get_cryptoportfolio", exception=e)
             return
         try:
-            cls.data = r.json(parse_int=D, parse_float=D)
+            cls.data.set(r.json(parse_int=D, parse_float=D))
             cls.parse_cryptoportfolio()
         except (JSONDecodeError, SimpleJSONDecodeError):
-            cls.data = None
-            cls.liquidities = {}
+            cls.data.set(None)
+            cls.last_date.set(None)
+            cls.liquidities.set({})
 
     @classmethod
     def parse_cryptoportfolio(cls):
@@ -366,6 +447,8 @@ class Portfolio:
             return clean_weights_
 
         def parse_weights(portfolio_hash):
+            if "weights" not in portfolio_hash:
+                return {}
             weights_hash = portfolio_hash["weights"]
             weights = {}
             for i in range(len(weights_hash["_row"])):
@@ -375,13 +458,16 @@ class Portfolio:
                         map(clean_weights(i), weights_hash.items())))
             return weights
 
-        high_liquidity = parse_weights(cls.data["portfolio_1"])
-        medium_liquidity = parse_weights(cls.data["portfolio_2"])
+        high_liquidity = parse_weights(cls.data.get("portfolio_1"))
+        medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
 
-        cls.liquidities = {
-                "medium": medium_liquidity,
-                "high":   high_liquidity,
-                }
-        cls.last_date = max(max(medium_liquidity.keys()), max(high_liquidity.keys()))
+        cls.liquidities.set({
+            "medium": medium_liquidity,
+            "high":   high_liquidity,
+            })
+        cls.last_date.set(max(
+            max(medium_liquidity.keys(), default=datetime(1, 1, 1)),
+            max(high_liquidity.keys(), default=datetime(1, 1, 1))
+            ))